/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.groups;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.VariableImpact;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.LocalPort;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterReference;
import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.parameter.StandardParameterUpdate;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.BatchSize;
import org.apache.nifi.registry.flow.Bundle;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.ConnectableComponent;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedFlowStatus;
import org.apache.nifi.registry.flow.VersionedFunnel;
import org.apache.nifi.registry.flow.VersionedLabel;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.VersionedPort;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedPropertyDescriptor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.EvolvingDifferenceDescriptor;
import org.apache.nifi.registry.flow.diff.FlowComparator;
import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.SnippetUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.VersionedFlowDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

public final class StandardProcessGroup implements ProcessGroup {

    private final String id;
    private final AtomicReference<ProcessGroup> parent;
    private final AtomicReference<String> name;
    private final AtomicReference<Position> position;
    private final AtomicReference<String> comments;
    private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
    private final AtomicReference<StandardVersionControlInformation> versionControlInfo = new AtomicReference<>();
    private static final SecureRandom randomGenerator = new SecureRandom();

    private final StandardProcessScheduler scheduler;
    private final ControllerServiceProvider controllerServiceProvider;
    private final FlowController flowController;
    private final FlowManager flowManager;

    private final Map<String, Port> inputPorts = new HashMap<>();
    private final Map<String, Port> outputPorts = new HashMap<>();
    private final Map<String, Connection> connections = new HashMap<>();
    private final Map<String, ProcessGroup> processGroups = new HashMap<>();
    private final Map<String, Label> labels = new HashMap<>();
    private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
    private final Map<String, ProcessorNode> processors = new HashMap<>();
    private final Map<String, Funnel> funnels = new HashMap<>();
    private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>();
    private final Map<String, Template> templates = new HashMap<>();
    private final StringEncryptor encryptor;
    private final MutableVariableRegistry variableRegistry;
    private final VersionControlFields versionControlFields = new VersionControlFields();
    private volatile ParameterContext parameterContext;

    private FlowFileConcurrency flowFileConcurrency = FlowFileConcurrency.UNBOUNDED;
    private volatile FlowFileGate flowFileGate = new UnboundedFlowFileGate();
    private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE;

    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = rwLock.readLock();
    private final Lock writeLock = rwLock.writeLock();

    private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class);

    public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final StandardProcessScheduler scheduler,
                                final NiFiProperties nifiProps, final StringEncryptor encryptor, final FlowController flowController,
                                final MutableVariableRegistry variableRegistry) {
        this.id = id;
        this.controllerServiceProvider = serviceProvider;
        this.parent = new AtomicReference<>();
        this.scheduler = scheduler;
        this.comments = new AtomicReference<>("");
        this.encryptor = encryptor;
        this.flowController = flowController;
        this.variableRegistry = variableRegistry;
        this.flowManager = flowController.getFlowManager();

        name = new AtomicReference<>();
        position = new AtomicReference<>(new Position(0D, 0D));
    }

    @Override
    public ProcessGroup getParent() {
        return parent.get();
    }

    private ProcessGroup getRoot() {
        ProcessGroup root = this;
        while (root.getParent() != null) {
            root = root.getParent();
        }
        return root;
    }

    @Override
    public void setParent(final ProcessGroup newParent) {
        parent.set(newParent);
    }

    @Override
    public Authorizable getParentAuthorizable() {
        return getParent();
    }

    @Override
    public Resource getResource() {
        return ResourceFactory.getComponentResource(ResourceType.ProcessGroup, getIdentifier(), getName());
    }

    @Override
    public String getIdentifier() {
        return id;
    }

    @Override
    public String getProcessGroupIdentifier() {
        final ProcessGroup parentProcessGroup = getParent();
        if (parentProcessGroup == null) {
            return null;
        } else {
            return parentProcessGroup.getIdentifier();
        }
    }

    @Override
    public String getName() {
        return name.get();
    }

    @Override
    public void setName(final String name) {
        if (StringUtils.isBlank(name)) {
            throw new IllegalArgumentException("The name of the process group must be specified.");
        }

        this.name.set(name);
    }

    @Override
    public void setPosition(final Position position) {
        this.position.set(position);
    }

    @Override
    public Position getPosition() {
        return position.get();
    }

    @Override
    public String getComments() {
        return this.comments.get();
    }

    @Override
    public void setComments(final String comments) {
        this.comments.set(comments);
    }

    @Override
    public ProcessGroupCounts getCounts() {
        int localInputPortCount = 0;
        int localOutputPortCount = 0;
        int publicInputPortCount = 0;
        int publicOutputPortCount = 0;

        int running = 0;
        int stopped = 0;
        int invalid = 0;
        int disabled = 0;
        int activeRemotePorts = 0;
        int inactiveRemotePorts = 0;

        int upToDate = 0;
        int locallyModified = 0;
        int stale = 0;
        int locallyModifiedAndStale = 0;
        int syncFailure = 0;

        readLock.lock();
        try {
            for (final ProcessorNode procNode : processors.values()) {
                if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
                    disabled++;
                } else if (procNode.isRunning()) {
                    running++;
                } else if (procNode.getValidationStatus() == ValidationStatus.INVALID) {
                    invalid++;
                } else {
                    stopped++;
                }
            }

            for (final Port port : inputPorts.values()) {
                if (port instanceof PublicPort) {
                    publicInputPortCount++;
                } else {
                    localInputPortCount++;
                }
                if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
                    disabled++;
                } else if (port.isRunning()) {
                    running++;
                } else if (!port.isValid()) {
                    invalid++;
                } else {
                    stopped++;
                }
            }

            for (final Port port : outputPorts.values()) {
                if (port instanceof PublicPort) {
                    publicOutputPortCount++;
                } else {
                    localOutputPortCount++;
                }
                if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
                    disabled++;
                } else if (port.isRunning()) {
                    running++;
                } else if (!port.isValid()) {
                    invalid++;
                } else {
                    stopped++;
                }
            }

            for (final ProcessGroup childGroup : processGroups.values()) {
                final ProcessGroupCounts childCounts = childGroup.getCounts();
                running += childCounts.getRunningCount();
                stopped += childCounts.getStoppedCount();
                invalid += childCounts.getInvalidCount();
                disabled += childCounts.getDisabledCount();

                // update the vci counts for this child group
                final VersionControlInformation vci = childGroup.getVersionControlInformation();
                if (vci != null) {
                    final VersionedFlowStatus flowStatus;
                    try {
                        flowStatus = vci.getStatus();
                    } catch (final Exception e) {
                        LOG.warn("Could not determine Version Control State for {}. Will consider state to be SYNC_FAILURE", this, e);
                        syncFailure++;
                        continue;
                    }

                    switch (flowStatus.getState()) {
                        case LOCALLY_MODIFIED:
                            locallyModified++;
                            break;
                        case LOCALLY_MODIFIED_AND_STALE:
                            locallyModifiedAndStale++;
                            break;
                        case STALE:
                            stale++;
                            break;
                        case SYNC_FAILURE:
                            syncFailure++;
                            break;
                        case UP_TO_DATE:
                            upToDate++;
                            break;
                    }
                }

                // update the vci counts for all nested groups within the child
                upToDate += childCounts.getUpToDateCount();
                locallyModified += childCounts.getLocallyModifiedCount();
                stale += childCounts.getStaleCount();
                locallyModifiedAndStale += childCounts.getLocallyModifiedAndStaleCount();
                syncFailure += childCounts.getSyncFailureCount();
            }

            for (final RemoteProcessGroup remoteGroup : findAllRemoteProcessGroups()) {
                // Count only input ports that have incoming connections
                for (final Port port : remoteGroup.getInputPorts()) {
                    if (port.hasIncomingConnection()) {
                        if (port.isRunning()) {
                            activeRemotePorts++;
                        } else {
                            inactiveRemotePorts++;
                        }
                    }
                }

                // Count only output ports that have outgoing connections
                for (final Port port : remoteGroup.getOutputPorts()) {
                    if (!port.getConnections().isEmpty()) {
                        if (port.isRunning()) {
                            activeRemotePorts++;
                        } else {
                            inactiveRemotePorts++;
                        }
                    }
                }

                final String authIssue = remoteGroup.getAuthorizationIssue();
                if (authIssue != null) {
                    invalid++;
                }
            }
        } finally {
            readLock.unlock();
        }

        return new ProcessGroupCounts(localInputPortCount, localOutputPortCount, publicInputPortCount, publicOutputPortCount,
                running, stopped, invalid, disabled, activeRemotePorts,
                inactiveRemotePorts, upToDate, locallyModified, stale, locallyModifiedAndStale, syncFailure);
    }

    @Override
    public boolean isRootGroup() {
        return parent.get() == null;
    }

    @Override
    public void startProcessing() {
        readLock.lock();
        try {
            findAllProcessors().stream().filter(START_PROCESSORS_FILTER).forEach(node -> {
                try {
                    node.getProcessGroup().startProcessor(node, true);
                } catch (final Throwable t) {
                    LOG.error("Unable to start processor {} due to {}", new Object[]{node.getIdentifier(), t});
                }
            });

            findAllInputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> port.getProcessGroup().startInputPort(port));

            findAllOutputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> {
                port.getProcessGroup().startOutputPort(port);
            });
        } finally {
            readLock.unlock();
        }

        onComponentModified();
    }

    @Override
    public void stopProcessing() {
        readLock.lock();
        try {
            findAllProcessors().stream().filter(STOP_PROCESSORS_FILTER).forEach(node -> {
                try {
                    node.getProcessGroup().stopProcessor(node);
                } catch (final Throwable t) {
                    LOG.error("Unable to stop processor {} due to {}", new Object[]{node.getIdentifier(), t});
                }
            });

            findAllInputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> port.getProcessGroup().stopInputPort(port));
            findAllOutputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> port.getProcessGroup().stopOutputPort(port));
        } finally {
            readLock.unlock();
        }

        onComponentModified();
    }

    private StateManager getStateManager(final String componentId) {
        return flowController.getStateManagerProvider().getStateManager(componentId);
    }

    private void shutdown(final ProcessGroup procGroup) {
        for (final ProcessorNode node : procGroup.getProcessors()) {
            try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), node.getProcessor().getClass(), node.getIdentifier())) {
                final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), () -> false);
                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext);
            }
        }

        for (final RemoteProcessGroup rpg : procGroup.getRemoteProcessGroups()) {
            rpg.shutdown();
        }

        for (final Connection connection : procGroup.getConnections()) {
            connection.getFlowFileQueue().stopLoadBalancing();
        }

        // Recursively shutdown child groups.
        for (final ProcessGroup group : procGroup.getProcessGroups()) {
            shutdown(group);
        }
    }

    @Override
    public void shutdown() {
        readLock.lock();
        try {
            shutdown(this);
        } finally {
            readLock.unlock();
        }
    }

    private void verifyPortUniqueness(final Port port,
                                      final Map<String, Port> portIdMap,
                                      final Function<String, Port> getPortByName) {

        if (portIdMap.containsKey(requireNonNull(port).getIdentifier())) {
            throw new IllegalStateException("A port with the same id already exists.");
        }

        if (getPortByName.apply(port.getName()) != null) {
            throw new IllegalStateException("A port with the same name already exists.");
        }
    }

    @Override
    public void addInputPort(final Port port) {
        if (isRootGroup()) {
            if (!(port instanceof PublicPort)) {
                throw new IllegalArgumentException("Cannot add Input Port of type " + port.getClass().getName() + " to the Root Group");
            }
        }

        writeLock.lock();
        try {
            // Unique port check within the same group.
            verifyPortUniqueness(port, inputPorts, name -> getInputPortByName(name));

            port.setProcessGroup(this);
            inputPorts.put(requireNonNull(port).getIdentifier(), port);
            flowManager.onInputPortAdded(port);
            onComponentModified();

            LOG.info("Input Port {} added to {}", port, this);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public void removeInputPort(final Port port) {
        writeLock.lock();
        try {
            final Port toRemove = inputPorts.get(requireNonNull(port).getIdentifier());
            if (toRemove == null) {
                throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group");
            }

            port.verifyCanDelete();
            for (final Connection conn : port.getConnections()) {
                conn.verifyCanDelete();
            }

            if (port.isRunning()) {
                stopInputPort(port);
            }

            // must copy to avoid a concurrent modification
            final Set<Connection> copy = new HashSet<>(port.getConnections());
            for (final Connection conn : copy) {
                removeConnection(conn);
            }

            final Port removed = inputPorts.remove(port.getIdentifier());
            if (removed == null) {
                throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group");
            }

            scheduler.onPortRemoved(port);
            onComponentModified();

            flowManager.onInputPortRemoved(port);
            LOG.info("Input Port {} removed from flow", port);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Port getInputPort(final String id) {
        readLock.lock();
        try {
            return inputPorts.get(Objects.requireNonNull(id));
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Set<Port> getInputPorts() {
        readLock.lock();
        try {
            return new HashSet<>(inputPorts.values());
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void addOutputPort(final Port port) {
        if (isRootGroup()) {
            if (!(port instanceof PublicPort)) {
                throw new IllegalArgumentException("Cannot add Output Port " + port.getClass().getName() + " to the Root Group");
            }
        }

        writeLock.lock();
        try {
            // Unique port check within the same group.
            verifyPortUniqueness(port, outputPorts, this::getOutputPortByName);

            port.setProcessGroup(this);
            outputPorts.put(port.getIdentifier(), port);
            flowManager.onOutputPortAdded(port);
            onComponentModified();

            LOG.info("Output Port {} added to {}", port, this);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public void removeOutputPort(final Port port) {
        writeLock.lock();
        try {
            final Port toRemove = outputPorts.get(requireNonNull(port).getIdentifier());
            toRemove.verifyCanDelete();

            if (port.isRunning()) {
                stopOutputPort(port);
            }

            if (!toRemove.getConnections().isEmpty()) {
                throw new IllegalStateException(port.getIdentifier() + " cannot be removed until its connections are removed");
            }

            final Port removed = outputPorts.remove(port.getIdentifier());
            if (removed == null) {
                throw new IllegalStateException(port.getIdentifier() + " is not an Output Port of this Process Group");
            }

            scheduler.onPortRemoved(port);
            onComponentModified();

            flowManager.onOutputPortRemoved(port);
            LOG.info("Output Port {} removed from flow", port);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Port getOutputPort(final String id) {
        readLock.lock();
        try {
            return outputPorts.get(Objects.requireNonNull(id));
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Set<Port> getOutputPorts() {
        readLock.lock();
        try {
            return new HashSet<>(outputPorts.values());
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void addProcessGroup(final ProcessGroup group) {
        if (StringUtils.isEmpty(group.getName())) {
            throw new IllegalArgumentException("Process Group's name must be specified");
        }

        writeLock.lock();
        try {
            group.setParent(this);
            group.getVariableRegistry().setParent(getVariableRegistry());

            processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
            flowManager.onProcessGroupAdded(group);

            group.findAllControllerServices().forEach(this::updateControllerServiceReferences);
            group.findAllProcessors().forEach(this::updateControllerServiceReferences);

            onComponentModified();

            LOG.info("{} added to {}", group, this);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public ProcessGroup getProcessGroup(final String id) {
        readLock.lock();
        try {
            return processGroups.get(id);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Set<ProcessGroup> getProcessGroups() {
        readLock.lock();
        try {
            return new HashSet<>(processGroups.values());
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void removeProcessGroup(final ProcessGroup group) {
        requireNonNull(group).verifyCanDelete();

        writeLock.lock();
        try {
            final ProcessGroup toRemove = processGroups.get(group.getIdentifier());
            if (toRemove == null) {
                throw new IllegalStateException(group.getIdentifier() + " is not a member of this Process Group");
            }
            toRemove.verifyCanDelete();

            removeComponents(group);
            processGroups.remove(group.getIdentifier());
            onComponentModified();

            flowManager.onProcessGroupRemoved(group);
            LOG.info("{} removed from flow", group);
        } finally {
            writeLock.unlock();
        }
    }

    private void removeComponents(final ProcessGroup group) {
        for (final Connection connection : new ArrayList<>(group.getConnections())) {
            group.removeConnection(connection);
        }

        for (final Port port : new ArrayList<>(group.getInputPorts())) {
            group.removeInputPort(port);
        }

        for (final Port port : new ArrayList<>(group.getOutputPorts())) {
            group.removeOutputPort(port);
        }

        for (final Funnel funnel : new ArrayList<>(group.getFunnels())) {
            group.removeFunnel(funnel);
        }

        for (final ProcessorNode processor : new ArrayList<>(group.getProcessors())) {
            group.removeProcessor(processor);
        }

        for (final RemoteProcessGroup rpg : new ArrayList<>(group.getRemoteProcessGroups())) {
            group.removeRemoteProcessGroup(rpg);
        }

        for (final Label label : new ArrayList<>(group.getLabels())) {
            group.removeLabel(label);
        }

        for (final ControllerServiceNode cs : group.getControllerServices(false)) {
            // Must go through Controller Service here because we need to ensure that it is removed from the cache
            flowController.getControllerServiceProvider().removeControllerService(cs);
        }

        for (final ProcessGroup childGroup : new ArrayList<>(group.getProcessGroups())) {
            group.removeProcessGroup(childGroup);
        }
    }

    @Override
    public void addRemoteProcessGroup(final RemoteProcessGroup remoteGroup) {
        writeLock.lock();
        try {
            if (remoteGroups.containsKey(requireNonNull(remoteGroup).getIdentifier())) {
                throw new IllegalStateException("RemoteProcessGroup already exists with ID " + remoteGroup.getIdentifier());
            }

            remoteGroup.setProcessGroup(this);
            remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup);
            onComponentModified();

            LOG.info("{} added to {}", remoteGroup, this);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Set<RemoteProcessGroup> getRemoteProcessGroups() {
        readLock.lock();
        try {
            return new HashSet<>(remoteGroups.values());
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void removeRemoteProcessGroup(final RemoteProcessGroup remoteProcessGroup) {
        final String remoteGroupId = requireNonNull(remoteProcessGroup).getIdentifier();

        writeLock.lock();
        try {
            final RemoteProcessGroup remoteGroup = remoteGroups.get(remoteGroupId);
            if (remoteGroup == null) {
                throw new IllegalStateException(remoteProcessGroup.getIdentifier() + " is not a member of this Process Group");
            }

            remoteGroup.verifyCanDelete();
            for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) {
                for (final Connection connection : port.getConnections()) {
                    connection.verifyCanDelete();
                }
            }

            onComponentModified();

            for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) {
                // must copy to avoid a concurrent modification
                final Set<Connection> copy = new HashSet<>(port.getConnections());
                for (final Connection connection : copy) {
                    removeConnection(connection);
                }
            }

            try {
                remoteGroup.onRemove();
            } catch (final Exception e) {
                LOG.warn("Failed to clean up resources for {} due to {}", remoteGroup, e);
            }

            remoteGroup.getInputPorts().forEach(scheduler::onPortRemoved);
            remoteGroup.getOutputPorts().forEach(scheduler::onPortRemoved);

            final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider();
            scheduler.submitFrameworkTask(new Runnable() {
                @Override
                public void run() {
                    stateManagerProvider.onComponentRemoved(remoteGroup.getIdentifier());
                }
            });

            remoteGroups.remove(remoteGroupId);
            LOG.info("{} removed from flow", remoteProcessGroup);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public void addProcessor(final ProcessorNode processor) {
        writeLock.lock();
        try {
            final String processorId = requireNonNull(processor).getIdentifier();
            final ProcessorNode existingProcessor = processors.get(processorId);
            if (existingProcessor != null) {
                throw new IllegalStateException("A processor is already registered to this ProcessGroup with ID " + processorId);
            }

            processor.setProcessGroup(this);
            processor.getVariableRegistry().setParent(getVariableRegistry());
            processors.put(processorId, processor);
            flowManager.onProcessorAdded(processor);
            updateControllerServiceReferences(processor);
            onComponentModified();

            LOG.info("{} added to {}", processor, this);
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * Looks for any property that is configured on the given component that references a Controller Service.
     * If any exists, and that Controller Service is not accessible from this Process Group, then the given
     * component will be removed from the service's referencing components.
     *
     * @param component the component whose invalid references should be removed
     */
    private void updateControllerServiceReferences(final ComponentNode component) {
        for (final Map.Entry<PropertyDescriptor, String> entry : component.getEffectivePropertyValues().entrySet()) {
            final String serviceId = entry.getValue();
            if (serviceId == null) {
                continue;
            }

            final PropertyDescriptor propertyDescriptor = entry.getKey();
            final Class<? extends ControllerService> serviceClass = propertyDescriptor.getControllerServiceDefinition();

            if (serviceClass != null) {
                final boolean validReference = isValidServiceReference(serviceId, serviceClass);
                final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(serviceId);
                if (serviceNode != null) {
                    if (validReference) {
                        serviceNode.addReference(component, propertyDescriptor);
                    } else {
                        serviceNode.removeReference(component, propertyDescriptor);
                    }
                }
            }
        }
    }

    private boolean isValidServiceReference(final String serviceId, final Class<? extends ControllerService> serviceClass) {
        final Set<String> validServiceIds = controllerServiceProvider.getControllerServiceIdentifiers(serviceClass, getIdentifier());
        return validServiceIds.contains(serviceId);
    }

    @Override
    public void removeProcessor(final ProcessorNode processor) {
        boolean removed = false;
        final String id = requireNonNull(processor).getIdentifier();
        writeLock.lock();
        try {
            if (!processors.containsKey(id)) {
                throw new IllegalStateException(processor.getIdentifier() + " is not a member of this Process Group");
            }

            processor.verifyCanDelete();
            for (final Connection conn : processor.getConnections()) {
                conn.verifyCanDelete();
            }

            try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), processor.getProcessor().getClass(), processor.getIdentifier())) {
                final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), () -> false);
                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
            } catch (final Exception e) {
                throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of processor with id " + processor.getIdentifier(), e);
            }

            for (final Map.Entry<PropertyDescriptor, String> entry : processor.getEffectivePropertyValues().entrySet()) {
                final PropertyDescriptor descriptor = entry.getKey();
                if (descriptor.getControllerServiceDefinition() != null) {
                    final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
                    if (value != null) {
                        final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
                        if (serviceNode != null) {
                            serviceNode.removeReference(processor, descriptor);
                        }
                    }
                }
            }

            processors.remove(id);
            onComponentModified();

            scheduler.onProcessorRemoved(processor);
            flowManager.onProcessorRemoved(processor);

            final LogRepository logRepository = LogRepositoryFactory.getRepository(processor.getIdentifier());
            if (logRepository != null) {
                logRepository.removeAllObservers();
            }

            final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider();
            scheduler.submitFrameworkTask(new Runnable() {
                @Override
                public void run() {
                    stateManagerProvider.onComponentRemoved(processor.getIdentifier());
                }
            });

            // must copy to avoid a concurrent modification
            final Set<Connection> copy = new HashSet<>(processor.getConnections());
            for (final Connection conn : copy) {
                removeConnection(conn);
            }

            removed = true;
            LOG.info("{} removed from flow", processor);
        } finally {
            if (removed) {
                try {
                    LogRepositoryFactory.removeRepository(processor.getIdentifier());
                    flowController.getExtensionManager().removeInstanceClassLoader(id);
                } catch (Throwable t) {
                }
            }
            writeLock.unlock();
        }
    }

    @Override
    public Collection<ProcessorNode> getProcessors() {
        readLock.lock();
        try {
            return new ArrayList<>(processors.values());
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public ProcessorNode getProcessor(final String id) {
        readLock.lock();
        try {
            return processors.get(Objects.requireNonNull(id));
        } finally {
            readLock.unlock();
        }
    }

    private boolean isInputPort(final Connectable connectable) {
        if (connectable.getConnectableType() != ConnectableType.INPUT_PORT) {
            return false;
        }
        return findInputPort(connectable.getIdentifier()) != null;
    }

    private boolean isOutputPort(final Connectable connectable) {
        if (connectable.getConnectableType() != ConnectableType.OUTPUT_PORT) {
            return false;
        }
        return findOutputPort(connectable.getIdentifier()) != null;
    }

    @Override
    public void inheritConnection(final Connection connection) {
        writeLock.lock();
        try {
            connections.put(connection.getIdentifier(), connection);
            onComponentModified();
            connection.setProcessGroup(this);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public void addConnection(final Connection connection) {
        writeLock.lock();
        try {
            final String id = requireNonNull(connection).getIdentifier();
            final Connection existingConnection = connections.get(id);
            if (existingConnection != null) {
                throw new IllegalStateException("Connection already exists with ID " + id);
            }

            final Connectable source = connection.getSource();
            final Connectable destination = connection.getDestination();
            final ProcessGroup sourceGroup = source.getProcessGroup();
            final ProcessGroup destinationGroup = destination.getProcessGroup();

            // validate the connection is validate wrt to the source & destination groups
            if (isInputPort(source)) { // if source is an input port, its destination must be in the same group unless it's an input port
                if (isInputPort(destination)) { // if destination is input port, it must be in a child group.
                    if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
                        throw new IllegalStateException("Cannot add Connection to Process Group because destination is an Input Port that does not belong to a child Process Group");
                    }
                } else if (sourceGroup != this || destinationGroup != this) {
                    throw new IllegalStateException("Cannot add Connection to Process Group because source and destination are not both in this Process Group");
                }
            } else if (isOutputPort(source)) {
                // if source is an output port, its group must be a child of this group, and its destination must be in this
                // group (processor/output port) or a child group (input port)
                if (!processGroups.containsKey(sourceGroup.getIdentifier())) {
                    throw new IllegalStateException("Cannot add Connection to Process Group because source is an Output Port that does not belong to a child Process Group");
                }

                if (isInputPort(destination)) {
                    if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
                        throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input Port that does not belong to a child Process Group");
                    }
                } else if (destinationGroup != this) {
                    throw new IllegalStateException("Cannot add Connection to Process Group because its destination does not belong to this Process Group");
                }
            } else { // source is not a port
                if (sourceGroup != this) {
                    throw new IllegalStateException("Cannot add Connection to Process Group because the source does not belong to this Process Group");
                }

                if (isOutputPort(destination)) {
                    if (destinationGroup != this) {
                        throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Output Port but does not belong to this Process Group");
                    }
                } else if (isInputPort(destination)) {
                    if (!processGroups.containsKey(destinationGroup.getIdentifier())) {
                        throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input "
                                + "Port but the Input Port does not belong to a child Process Group");
                    }
                } else if (destinationGroup != this) {
                    throw new IllegalStateException("Cannot add Connection between " + source.getIdentifier() + " and " + destination.getIdentifier()
                            + " because they are in different Process Groups and neither is an Input Port or Output Port");
                }
            }

            connection.setProcessGroup(this);
            source.addConnection(connection);
            if (source != destination) {  // don't call addConnection twice if it's a self-looping connection.
                destination.addConnection(connection);
            }
            connections.put(connection.getIdentifier(), connection);
            flowManager.onConnectionAdded(connection);
            onComponentModified();
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Connectable getConnectable(final String id) {
        readLock.lock();
        try {
            final ProcessorNode node = processors.get(id);
            if (node != null) {
                return node;
            }

            final Port inputPort = inputPorts.get(id);
            if (inputPort != null) {
                return inputPort;
            }

            final Port outputPort = outputPorts.get(id);
            if (outputPort != null) {
                return outputPort;
            }

            final Funnel funnel = funnels.get(id);
            if (funnel != null) {
                return funnel;
            }

            return null;
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void removeConnection(final Connection connectionToRemove) {
        writeLock.lock();
        try {
            // verify that Connection belongs to this group
            final Connection connection = connections.get(requireNonNull(connectionToRemove).getIdentifier());
            if (connection == null) {
                throw new IllegalStateException("Connection " + connectionToRemove.getIdentifier() + " is not a member of this Process Group");
            }

            connectionToRemove.verifyCanDelete();

            connectionToRemove.getFlowFileQueue().stopLoadBalancing();

            final Connectable source = connectionToRemove.getSource();
            final Connectable dest = connectionToRemove.getDestination();

            // update the source & destination
            source.removeConnection(connection);
            if (source != dest) {
                dest.removeConnection(connection);
            }

            // remove the connection from our map
            connections.remove(connection.getIdentifier());
            LOG.info("{} removed from flow", connection);
            onComponentModified();

            flowManager.onConnectionRemoved(connection);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Set<Connection> getConnections() {
        readLock.lock();
        try {
            return new HashSet<>(connections.values());
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Connection getConnection(final String id) {
        readLock.lock();
        try {
            return connections.get(Objects.requireNonNull(id));
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Connection findConnection(final String id) {
        final Connection connection = flowManager.getConnection(id);
        if (connection == null) {
            return null;
        }

        // We found a Connection in the Controller, but we only want to return it if
        // the Process Group is this or is a child of this.
        if (isOwner(connection.getProcessGroup())) {
            return connection;
        }

        return null;
    }


    @Override
    public List<Connection> findAllConnections() {
        return findAllConnections(this);
    }

    private List<Connection> findAllConnections(final ProcessGroup group) {
        final List<Connection> connections = new ArrayList<>(group.getConnections());
        for (final ProcessGroup childGroup : group.getProcessGroups()) {
            connections.addAll(findAllConnections(childGroup));
        }
        return connections;
    }

    @Override
    public void addLabel(final Label label) {
        writeLock.lock();
        try {
            final Label existing = labels.get(requireNonNull(label).getIdentifier());
            if (existing != null) {
                throw new IllegalStateException("A label already exists in this ProcessGroup with ID " + label.getIdentifier());
            }

            label.setProcessGroup(this);
            labels.put(label.getIdentifier(), label);
            onComponentModified();
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public void removeLabel(final Label label) {
        writeLock.lock();
        try {
            final Label removed = labels.remove(requireNonNull(label).getIdentifier());
            if (removed == null) {
                throw new IllegalStateException(label + " is not a member of this Process Group.");
            }

            onComponentModified();
            LOG.info("Label with ID {} removed from flow", label.getIdentifier());
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Set<Label> getLabels() {
        readLock.lock();
        try {
            return new HashSet<>(labels.values());
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Label getLabel(final String id) {
        readLock.lock();
        try {
            return labels.get(id);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public boolean isEmpty() {
        readLock.lock();
        try {
            return inputPorts.isEmpty() && outputPorts.isEmpty() && connections.isEmpty()
                    && processGroups.isEmpty() && labels.isEmpty() && processors.isEmpty() && remoteGroups.isEmpty();
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public RemoteProcessGroup getRemoteProcessGroup(final String id) {
        readLock.lock();
        try {
            return remoteGroups.get(Objects.requireNonNull(id));
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public CompletableFuture<Void> startProcessor(final ProcessorNode processor, final boolean failIfStopping) {
        readLock.lock();
        try {
            if (getProcessor(processor.getIdentifier()) == null) {
                throw new IllegalStateException("Processor is not a member of this Process Group");
            }

            final ScheduledState state = processor.getScheduledState();
            if (state == ScheduledState.DISABLED) {
                throw new IllegalStateException("Processor is disabled");
            } else if (state == ScheduledState.RUNNING) {
                return CompletableFuture.completedFuture(null);
            }
            processor.reloadAdditionalResourcesIfNecessary();

            return scheduler.startProcessor(processor, failIfStopping);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void startInputPort(final Port port) {
        readLock.lock();
        try {
            if (getInputPort(port.getIdentifier()) == null) {
                throw new IllegalStateException("Port " + port.getIdentifier() + " is not a member of this Process Group");
            }

            final ScheduledState state = port.getScheduledState();
            if (state == ScheduledState.DISABLED) {
                throw new IllegalStateException("InputPort " + port.getIdentifier() + " is disabled");
            } else if (state == ScheduledState.RUNNING) {
                return;
            }

            scheduler.startPort(port);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void startOutputPort(final Port port) {
        readLock.lock();
        try {
            if (getOutputPort(port.getIdentifier()) == null) {
                throw new IllegalStateException("Port is not a member of this Process Group");
            }

            final ScheduledState state = port.getScheduledState();
            if (state == ScheduledState.DISABLED) {
                throw new IllegalStateException("OutputPort is disabled");
            } else if (state == ScheduledState.RUNNING) {
                return;
            }

            scheduler.startPort(port);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void startFunnel(final Funnel funnel) {
        readLock.lock();
        try {
            if (getFunnel(funnel.getIdentifier()) == null) {
                throw new IllegalStateException("Funnel is not a member of this Process Group");
            }

            final ScheduledState state = funnel.getScheduledState();
            if (state == ScheduledState.RUNNING) {
                return;
            }
            scheduler.startFunnel(funnel);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public CompletableFuture<Void> stopProcessor(final ProcessorNode processor) {
        readLock.lock();
        try {
            if (!processors.containsKey(processor.getIdentifier())) {
                throw new IllegalStateException("No processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
            }

            final ScheduledState state = processor.getScheduledState();
            if (state == ScheduledState.DISABLED) {
                throw new IllegalStateException("Processor is disabled");
            }

            return scheduler.stopProcessor(processor);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void terminateProcessor(final ProcessorNode processor) {
        readLock.lock();
        try {
            if (!processors.containsKey(processor.getIdentifier())) {
                throw new IllegalStateException("No processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
            }

            final ScheduledState state = processor.getScheduledState();
            if (state != ScheduledState.STOPPED) {
                throw new IllegalStateException("Cannot terminate processor with ID " + processor.getIdentifier() + " because it is not stopped");
            }

            scheduler.terminateProcessor(processor);
        } finally {
            readLock.unlock();
        }

    }


    @Override
    public void stopInputPort(final Port port) {
        readLock.lock();
        try {
            if (!inputPorts.containsKey(port.getIdentifier())) {
                throw new IllegalStateException("No Input Port with ID " + port.getIdentifier() + " belongs to this Process Group");
            }

            final ScheduledState state = port.getScheduledState();
            if (state == ScheduledState.DISABLED) {
                throw new IllegalStateException("InputPort is disabled");
            } else if (state == ScheduledState.STOPPED) {
                return;
            }

            scheduler.stopPort(port);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void stopOutputPort(final Port port) {
        readLock.lock();
        try {
            if (!outputPorts.containsKey(port.getIdentifier())) {
                throw new IllegalStateException("No Output Port with ID " + port.getIdentifier() + " belongs to this Process Group");
            }

            final ScheduledState state = port.getScheduledState();
            if (state == ScheduledState.DISABLED) {
                throw new IllegalStateException("OutputPort is disabled");
            } else if (state == ScheduledState.STOPPED) {
                return;
            }

            scheduler.stopPort(port);
        } finally {
            readLock.unlock();
        }
    }

    private void stopFunnel(final Funnel funnel) {
        readLock.lock();
        try {
            if (!funnels.containsKey(funnel.getIdentifier())) {
                throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group");
            }

            final ScheduledState state = funnel.getScheduledState();
            if (state == ScheduledState.DISABLED) {
                throw new IllegalStateException("Funnel is disabled");
            } else if (state == ScheduledState.STOPPED) {
                return;
            }

            scheduler.stopFunnel(funnel);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void enableInputPort(final Port port) {
        readLock.lock();
        try {
            if (!inputPorts.containsKey(port.getIdentifier())) {
                throw new IllegalStateException("No Input Port with ID " + port.getIdentifier() + " belongs to this Process Group");
            }

            final ScheduledState state = port.getScheduledState();
            if (state == ScheduledState.STOPPED) {
                return;
            } else if (state == ScheduledState.RUNNING) {
                throw new IllegalStateException("InputPort is currently running");
            }

            scheduler.enablePort(port);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void enableOutputPort(final Port port) {
        readLock.lock();
        try {
            if (!outputPorts.containsKey(port.getIdentifier())) {
                throw new IllegalStateException("No Output Port with ID " + port.getIdentifier() + " belongs to this Process Group");
            }

            final ScheduledState state = port.getScheduledState();
            if (state == ScheduledState.STOPPED) {
                return;
            } else if (state == ScheduledState.RUNNING) {
                throw new IllegalStateException("OutputPort is currently running");
            }

            scheduler.enablePort(port);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void enableProcessor(final ProcessorNode processor) {
        readLock.lock();
        try {
            if (!processors.containsKey(processor.getIdentifier())) {
                throw new IllegalStateException("No Processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
            }

            final ScheduledState state = processor.getScheduledState();
            if (state == ScheduledState.STOPPED) {
                return;
            } else if (state == ScheduledState.RUNNING) {
                throw new IllegalStateException("Processor is currently running");
            }

            scheduler.enableProcessor(processor);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void disableInputPort(final Port port) {
        readLock.lock();
        try {
            if (!inputPorts.containsKey(port.getIdentifier())) {
                throw new IllegalStateException("No InputPort with ID " + port.getIdentifier() + " belongs to this Process Group");
            }

            final ScheduledState state = port.getScheduledState();
            if (state == ScheduledState.DISABLED) {
                return;
            } else if (state == ScheduledState.RUNNING) {
                throw new IllegalStateException("InputPort is currently running");
            }

            scheduler.disablePort(port);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void disableOutputPort(final Port port) {
        readLock.lock();
        try {
            if (!outputPorts.containsKey(port.getIdentifier())) {
                throw new IllegalStateException("No OutputPort with ID " + port.getIdentifier() + " belongs to this Process Group");
            }

            final ScheduledState state = port.getScheduledState();
            if (state == ScheduledState.DISABLED) {
                return;
            } else if (state == ScheduledState.RUNNING) {
                throw new IllegalStateException("OutputPort is currently running");
            }

            scheduler.disablePort(port);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void disableProcessor(final ProcessorNode processor) {
        readLock.lock();
        try {
            if (!processors.containsKey(processor.getIdentifier())) {
                throw new IllegalStateException("No Processor with ID " + processor.getIdentifier() + " belongs to this Process Group");
            }

            final ScheduledState state = processor.getScheduledState();
            if (state == ScheduledState.DISABLED) {
                return;
            } else if (state == ScheduledState.RUNNING) {
                throw new IllegalStateException("Processor is currently running");
            }

            scheduler.disableProcessor(processor);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public boolean equals(final Object obj) {
        if (obj instanceof StandardProcessGroup) {
            final StandardProcessGroup other = (StandardProcessGroup) obj;
            return (getIdentifier().equals(other.getIdentifier()));
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return new HashCodeBuilder().append(getIdentifier()).toHashCode();
    }

    @Override
    public String toString() {
        return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
            .append("identifier", getIdentifier())
            .append("name", getName())
            .toString();
    }

    @Override
    public ProcessGroup findProcessGroup(final String id) {
        if (requireNonNull(id).equals(getIdentifier())) {
            return this;
        }

        final ProcessGroup group = flowManager.getGroup(id);
        if (group == null) {
            return null;
        }

        // We found a Process Group in the Controller, but we only want to return it if
        // the Process Group is this or is a child of this.
        if (isOwner(group.getParent())) {
            return group;
        }

        return null;
    }

    @Override
    public List<ProcessGroup> findAllProcessGroups() {
        return findAllProcessGroups(this);
    }

    @Override
    public List<ProcessGroup> findAllProcessGroups(final Predicate<ProcessGroup> filter) {
        final List<ProcessGroup> matching = new ArrayList<>();
        if (filter.test(this)) {
            matching.add(this);
        }

        for (final ProcessGroup group : getProcessGroups()) {
            matching.addAll(group.findAllProcessGroups(filter));
        }

        return matching;
    }

    private List<ProcessGroup> findAllProcessGroups(final ProcessGroup start) {
        final List<ProcessGroup> allProcessGroups = new ArrayList<>(start.getProcessGroups());
        for (final ProcessGroup childGroup : start.getProcessGroups()) {
            allProcessGroups.addAll(findAllProcessGroups(childGroup));
        }
        return allProcessGroups;
    }

    @Override
    public List<RemoteProcessGroup> findAllRemoteProcessGroups() {
        return findAllRemoteProcessGroups(this);
    }

    private List<RemoteProcessGroup> findAllRemoteProcessGroups(final ProcessGroup start) {
        final List<RemoteProcessGroup> remoteGroups = new ArrayList<>(start.getRemoteProcessGroups());
        for (final ProcessGroup childGroup : start.getProcessGroups()) {
            remoteGroups.addAll(findAllRemoteProcessGroups(childGroup));
        }
        return remoteGroups;
    }

    @Override
    public RemoteProcessGroup findRemoteProcessGroup(final String id) {
        return findRemoteProcessGroup(requireNonNull(id), this);
    }

    private RemoteProcessGroup findRemoteProcessGroup(final String id, final ProcessGroup start) {
        RemoteProcessGroup remoteGroup = start.getRemoteProcessGroup(id);
        if (remoteGroup != null) {
            return remoteGroup;
        }

        for (final ProcessGroup group : start.getProcessGroups()) {
            remoteGroup = findRemoteProcessGroup(id, group);
            if (remoteGroup != null) {
                return remoteGroup;
            }
        }

        return null;
    }

    @Override
    public ProcessorNode findProcessor(final String id) {
        final ProcessorNode node = flowManager.getProcessorNode(id);
        if (node == null) {
            return null;
        }

        // We found a Processor in the Controller, but we only want to return it if
        // the Process Group is this or is a child of this.
        if (isOwner(node.getProcessGroup())) {
            return node;
        }

        return null;
    }

    private boolean isOwner(ProcessGroup owner) {
        while (owner != this && owner != null) {
            owner = owner.getParent();
        }

        return owner == this;

    }

    @Override
    public List<ProcessorNode> findAllProcessors() {
        return findAllProcessors(this);
    }

    private List<ProcessorNode> findAllProcessors(final ProcessGroup start) {
        final List<ProcessorNode> allNodes = new ArrayList<>(start.getProcessors());
        for (final ProcessGroup group : start.getProcessGroups()) {
            allNodes.addAll(findAllProcessors(group));
        }
        return allNodes;
    }


    @Override
    public RemoteGroupPort findRemoteGroupPort(final String identifier) {
        readLock.lock();
        try {
            for (final RemoteProcessGroup remoteGroup : remoteGroups.values()) {
                final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier);
                if (remoteInPort != null) {
                    return remoteInPort;
                }

                final RemoteGroupPort remoteOutPort = remoteGroup.getOutputPort(identifier);
                if (remoteOutPort != null) {
                    return remoteOutPort;
                }
            }

            for (final ProcessGroup childGroup : processGroups.values()) {
                final RemoteGroupPort childGroupRemoteGroupPort = childGroup.findRemoteGroupPort(identifier);
                if (childGroupRemoteGroupPort != null) {
                    return childGroupRemoteGroupPort;
                }
            }

            return null;
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Label findLabel(final String id) {
        return findLabel(id, this);
    }

    private Label findLabel(final String id, final ProcessGroup start) {
        Label label = start.getLabel(id);
        if (label != null) {
            return label;
        }

        for (final ProcessGroup group : start.getProcessGroups()) {
            label = findLabel(id, group);
            if (label != null) {
                return label;
            }
        }

        return null;
    }

    @Override
    public List<Label> findAllLabels() {
        return findAllLabels(this);
    }

    private List<Label> findAllLabels(final ProcessGroup start) {
        final List<Label> allLabels = new ArrayList<>(start.getLabels());
        for (final ProcessGroup group : start.getProcessGroups()) {
            allLabels.addAll(findAllLabels(group));
        }
        return allLabels;
    }

    @Override
    public Port findInputPort(final String id) {
        final Port port = flowManager.getInputPort(id);
        if (port == null) {
            return null;
        }

        if (isOwner(port.getProcessGroup())) {
            return port;
        }

        return null;
    }

    @Override
    public List<Port> findAllInputPorts() {
        return findAllInputPorts(this);
    }

    private List<Port> findAllInputPorts(final ProcessGroup start) {
        final List<Port> allOutputPorts = new ArrayList<>(start.getInputPorts());
        for (final ProcessGroup group : start.getProcessGroups()) {
            allOutputPorts.addAll(findAllInputPorts(group));
        }
        return allOutputPorts;
    }

    @Override
    public Port findOutputPort(final String id) {
        final Port port = flowManager.getOutputPort(id);
        if (port == null) {
            return null;
        }

        if (isOwner(port.getProcessGroup())) {
            return port;
        }

        return null;
    }

    @Override
    public List<Port> findAllOutputPorts() {
        return findAllOutputPorts(this);
    }

    private List<Port> findAllOutputPorts(final ProcessGroup start) {
        final List<Port> allOutputPorts = new ArrayList<>(start.getOutputPorts());
        for (final ProcessGroup group : start.getProcessGroups()) {
            allOutputPorts.addAll(findAllOutputPorts(group));
        }
        return allOutputPorts;
    }

    @Override
    public List<Funnel> findAllFunnels() {
        return findAllFunnels(this);
    }

    private List<Funnel> findAllFunnels(final ProcessGroup start) {
        final List<Funnel> allFunnels = new ArrayList<>(start.getFunnels());
        for (final ProcessGroup group : start.getProcessGroups()) {
            allFunnels.addAll(findAllFunnels(group));
        }
        return allFunnels;
    }

    @Override
    public Port getInputPortByName(final String name) {
        return getPortByName(name, this, new InputPortRetriever());
    }

    @Override
    public Port getOutputPortByName(final String name) {
        return getPortByName(name, this, new OutputPortRetriever());
    }

    private interface PortRetriever {

        Port getPort(ProcessGroup group, String id);

        Set<Port> getPorts(ProcessGroup group);
    }

    private static class InputPortRetriever implements PortRetriever {

        @Override
        public Set<Port> getPorts(final ProcessGroup group) {
            return group.getInputPorts();
        }

        @Override
        public Port getPort(final ProcessGroup group, final String id) {
            return group.getInputPort(id);
        }
    }

    private static class OutputPortRetriever implements PortRetriever {

        @Override
        public Set<Port> getPorts(final ProcessGroup group) {
            return group.getOutputPorts();
        }

        @Override
        public Port getPort(final ProcessGroup group, final String id) {
            return group.getOutputPort(id);
        }
    }

    private Port getPortByName(final String name, final ProcessGroup group, final PortRetriever retriever) {
        for (final Port port : retriever.getPorts(group)) {
            if (port.getName().equals(name)) {
                return port;
            }
        }

        return null;
    }

    @Override
    public void addFunnel(final Funnel funnel) {
        addFunnel(funnel, true);
    }

    @Override
    public void addFunnel(final Funnel funnel, final boolean autoStart) {
        writeLock.lock();
        try {
            final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier());
            if (existing != null) {
                throw new IllegalStateException("A funnel already exists in this ProcessGroup with ID " + funnel.getIdentifier());
            }

            funnel.setProcessGroup(this);
            funnels.put(funnel.getIdentifier(), funnel);
            flowManager.onFunnelAdded(funnel);

            if (autoStart) {
                startFunnel(funnel);
            }

            onComponentModified();
            LOG.info("{} added to {}", funnel, this);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Funnel getFunnel(final String id) {
        readLock.lock();
        try {
            return funnels.get(id);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Funnel findFunnel(final String id) {
        final Funnel funnel = flowManager.getFunnel(id);
        if (funnel == null) {
            return funnel;
        }

        if (isOwner(funnel.getProcessGroup())) {
            return funnel;
        }

        return null;
    }


    @Override
    public ControllerServiceNode findControllerService(final String id, final boolean includeDescendants, final boolean includeAncestors) {
        ControllerServiceNode serviceNode;
        if (includeDescendants) {
            serviceNode = findDescendantControllerService(id, this);
        } else {
            serviceNode = getControllerService(id);
        }

        if (serviceNode == null && includeAncestors) {
            serviceNode = findAncestorControllerService(id, getParent());
        }

        return serviceNode;
    }

    private ControllerServiceNode findAncestorControllerService(final String id, final ProcessGroup start) {
        if (start == null) {
            return null;
        }

        final ControllerServiceNode serviceNode = start.getControllerService(id);
        if (serviceNode != null) {
            return serviceNode;
        }

        final ProcessGroup parent = start.getParent();
        return findAncestorControllerService(id, parent);
    }

    private ControllerServiceNode findDescendantControllerService(final String id, final ProcessGroup start) {
        ControllerServiceNode service = start.getControllerService(id);
        if (service != null) {
            return service;
        }

        for (final ProcessGroup group : start.getProcessGroups()) {
            service = findDescendantControllerService(id, group);
            if (service != null) {
                return service;
            }
        }

        return null;
    }

    @Override
    public Set<ControllerServiceNode> findAllControllerServices() {
        return findAllControllerServices(this);
    }

    private Set<ControllerServiceNode> findAllControllerServices(ProcessGroup start) {
        final Set<ControllerServiceNode> services = start.getControllerServices(false);
        for (final ProcessGroup group : start.getProcessGroups()) {
            services.addAll(findAllControllerServices(group));
        }

        return services;
    }

    @Override
    public void removeFunnel(final Funnel funnel) {
        writeLock.lock();
        try {
            final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier());
            if (existing == null) {
                throw new IllegalStateException("Funnel " + funnel.getIdentifier() + " is not a member of this ProcessGroup");
            }

            funnel.verifyCanDelete();
            for (final Connection conn : funnel.getConnections()) {
                conn.verifyCanDelete();
            }

            stopFunnel(funnel);

            // must copy to avoid a concurrent modification
            final Set<Connection> copy = new HashSet<>(funnel.getConnections());
            for (final Connection conn : copy) {
                removeConnection(conn);
            }

            funnels.remove(funnel.getIdentifier());
            onComponentModified();

            flowManager.onFunnelRemoved(funnel);
            LOG.info("{} removed from flow", funnel);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Set<Funnel> getFunnels() {
        readLock.lock();
        try {
            return new HashSet<>(funnels.values());
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void addControllerService(final ControllerServiceNode service) {
        writeLock.lock();
        try {
            final String id = requireNonNull(service).getIdentifier();
            final ControllerServiceNode existingService = controllerServices.get(id);
            if (existingService != null) {
                throw new IllegalStateException("A Controller Service is already registered to this ProcessGroup with ID " + id);
            }

            service.setProcessGroup(this);
            service.getVariableRegistry().setParent(getVariableRegistry());
            this.controllerServices.put(service.getIdentifier(), service);
            LOG.info("{} added to {}", service, this);
            updateControllerServiceReferences(service);
            onComponentModified();
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public ControllerServiceNode getControllerService(final String id) {
        readLock.lock();
        try {
            return controllerServices.get(requireNonNull(id));
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Set<ControllerServiceNode> getControllerServices(final boolean recursive) {
        final Set<ControllerServiceNode> services = new HashSet<>();

        readLock.lock();
        try {
            services.addAll(controllerServices.values());
        } finally {
            readLock.unlock();
        }

        if (recursive) {
            final ProcessGroup parentGroup = parent.get();
            if (parentGroup != null) {
                services.addAll(parentGroup.getControllerServices(true));
            }
        }

        return services;
    }

    @Override
    public void removeControllerService(final ControllerServiceNode service) {
        boolean removed = false;
        writeLock.lock();
        try {
            final ControllerServiceNode existing = controllerServices.get(requireNonNull(service).getIdentifier());
            if (existing == null) {
                throw new IllegalStateException("ControllerService " + service.getIdentifier() + " is not a member of this Process Group");
            }

            service.verifyCanDelete();

            try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), service.getControllerServiceImplementation().getClass(), service.getIdentifier())) {
                final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null, variableRegistry);
                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext);
            }

            for (final Map.Entry<PropertyDescriptor, String> entry : service.getEffectivePropertyValues().entrySet()) {
                final PropertyDescriptor descriptor = entry.getKey();
                if (descriptor.getControllerServiceDefinition() != null) {
                    final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
                    if (value != null) {
                        final ControllerServiceNode referencedNode = controllerServiceProvider.getControllerServiceNode(value);
                        if (referencedNode != null) {
                            referencedNode.removeReference(service, descriptor);
                        }
                    }
                }
            }

            controllerServices.remove(service.getIdentifier());
            onComponentModified();

            // For any component that references this Controller Service, find the component's Process Group
            // and notify the Process Group that a component has been modified. This way, we know to re-calculate
            // whether or not the Process Group has local modifications.
            service.getReferences().getReferencingComponents().stream()
                    .map(ComponentNode::getProcessGroupIdentifier)
                    .filter(id -> !id.equals(getIdentifier()))
                    .forEach(groupId -> {
                        final ProcessGroup descendant = findProcessGroup(groupId);
                        if (descendant != null) {
                            descendant.onComponentModified();
                        }
                    });

            flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier());

            removed = true;
            LOG.info("{} removed from {}", service, this);
        } finally {
            if (removed) {
                try {
                    flowController.getExtensionManager().removeInstanceClassLoader(service.getIdentifier());
                } catch (Throwable t) {
                }
            }
            writeLock.unlock();
        }
    }

    @Override
    public void addTemplate(final Template template) {
        requireNonNull(template);

        writeLock.lock();
        try {
            final String id = template.getDetails().getId();
            if (id == null) {
                throw new IllegalStateException("Cannot add template that has no ID");
            }

            if (templates.containsKey(id)) {
                throw new IllegalStateException("Process Group already contains a Template with ID " + id);
            }

            templates.put(id, template);
            template.setProcessGroup(this);
            LOG.info("{} added to {}", template, this);
            onComponentModified();
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Template getTemplate(final String id) {
        readLock.lock();
        try {
            return templates.get(id);
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Template findTemplate(final String id) {
        return findTemplate(id, this);
    }

    private Template findTemplate(final String id, final ProcessGroup start) {
        final Template template = start.getTemplate(id);
        if (template != null) {
            return template;
        }

        for (final ProcessGroup child : start.getProcessGroups()) {
            final Template childTemplate = findTemplate(id, child);
            if (childTemplate != null) {
                return childTemplate;
            }
        }

        return null;
    }

    @Override
    public Set<Template> getTemplates() {
        readLock.lock();
        try {
            return new HashSet<>(templates.values());
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Set<Template> findAllTemplates() {
        return findAllTemplates(this);
    }

    private Set<Template> findAllTemplates(final ProcessGroup group) {
        final Set<Template> templates = new HashSet<>(group.getTemplates());
        for (final ProcessGroup childGroup : group.getProcessGroups()) {
            templates.addAll(findAllTemplates(childGroup));
        }
        return templates;
    }

    @Override
    public void removeTemplate(final Template template) {
        writeLock.lock();
        try {
            final Template existing = templates.get(requireNonNull(template).getIdentifier());
            if (existing == null) {
                throw new IllegalStateException("Template " + template.getIdentifier() + " is not a member of this ProcessGroup");
            }

            templates.remove(template.getIdentifier());
            onComponentModified();

            LOG.info("{} removed from flow", template);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public void remove(final Snippet snippet) {
        writeLock.lock();
        try {
            // ensure that all components are valid
            verifyContents(snippet);

            final Set<Connectable> connectables = getAllConnectables(snippet);
            final Set<String> connectionIdsToRemove = new HashSet<>(getKeys(snippet.getConnections()));
            // Remove all connections that are the output of any Connectable.
            for (final Connectable connectable : connectables) {
                for (final Connection conn : connectable.getConnections()) {
                    if (!connections.containsKey(conn.getIdentifier())) {
                        throw new IllegalStateException("Connectable component " + connectable.getIdentifier()
                                + " cannot be removed because it has incoming connections from the parent Process Group");
                    }
                    connectionIdsToRemove.add(conn.getIdentifier());
                }
            }

            // verify that all connections can be removed
            for (final String id : connectionIdsToRemove) {
                connections.get(id).verifyCanDelete();
            }

            // verify that all processors are stopped and have no active threads
            for (final String procId : snippet.getProcessors().keySet()) {
                final ProcessorNode procNode = getProcessor(procId);
                if (procNode.isRunning()) {
                    throw new IllegalStateException("Processor " + procNode.getIdentifier() + " cannot be removed because it is running");
                }
                final int activeThreadCount = scheduler.getActiveThreadCount(procNode);
                if (activeThreadCount != 0) {
                    throw new IllegalStateException("Processor " + procNode.getIdentifier() + " cannot be removed because it still has " + activeThreadCount + " active threads");
                }
            }

            // verify that none of the connectables have incoming connections that are not in the Snippet.
            final Set<String> connectionIds = snippet.getConnections().keySet();
            for (final Connectable connectable : connectables) {
                for (final Connection conn : connectable.getIncomingConnections()) {
                    if (!connectionIds.contains(conn.getIdentifier()) && !connectables.contains(conn.getSource())) {
                        throw new IllegalStateException("Connectable component " + connectable.getIdentifier() + " cannot be removed because it has incoming connections "
                                + "that are not selected to be deleted");
                    }
                }
            }

            // verify that all of the ProcessGroups in the snippet are empty
            for (final String groupId : snippet.getProcessGroups().keySet()) {
                final ProcessGroup toRemove = getProcessGroup(groupId);
                toRemove.verifyCanDelete(true);
            }

            onComponentModified();

            for (final String id : connectionIdsToRemove) {
                removeConnection(connections.get(id));
            }
            for (final String id : getKeys(snippet.getInputPorts())) {
                removeInputPort(inputPorts.get(id));
            }
            for (final String id : getKeys(snippet.getOutputPorts())) {
                removeOutputPort(outputPorts.get(id));
            }
            for (final String id : getKeys(snippet.getFunnels())) {
                removeFunnel(funnels.get(id));
            }
            for (final String id : getKeys(snippet.getLabels())) {
                removeLabel(labels.get(id));
            }
            for (final String id : getKeys(snippet.getProcessors())) {
                removeProcessor(processors.get(id));
            }
            for (final String id : getKeys(snippet.getRemoteProcessGroups())) {
                removeRemoteProcessGroup(remoteGroups.get(id));
            }
            for (final String id : getKeys(snippet.getProcessGroups())) {
                removeProcessGroup(processGroups.get(id));
            }
        } finally {
            writeLock.unlock();
        }
    }

    private Set<String> getKeys(final Map<String, Revision> map) {
        return (map == null) ? Collections.emptySet() : map.keySet();
    }

    @Override
    public void move(final Snippet snippet, final ProcessGroup destination) {
        writeLock.lock();
        try {
            verifyContents(snippet);
            verifyDestinationNotInSnippet(snippet, destination);
            SnippetUtils.verifyNoVersionControlConflicts(snippet, this, destination);

            if (!isDisconnected(snippet)) {
                throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
            }

            if (destination.isRootGroup() && (
                snippet.getInputPorts().keySet().stream().map(this::getInputPort).anyMatch(port -> port instanceof LocalPort)
                    || snippet.getOutputPorts().keySet().stream().map(this::getOutputPort).anyMatch(port -> port instanceof LocalPort))) {
                throw new IllegalStateException("Cannot move local Ports into the root group");
            }

            onComponentModified();

            for (final String id : getKeys(snippet.getInputPorts())) {
                destination.addInputPort(inputPorts.remove(id));
            }
            for (final String id : getKeys(snippet.getOutputPorts())) {
                destination.addOutputPort(outputPorts.remove(id));
            }
            for (final String id : getKeys(snippet.getFunnels())) {
                destination.addFunnel(funnels.remove(id));
            }
            for (final String id : getKeys(snippet.getLabels())) {
                destination.addLabel(labels.remove(id));
            }
            for (final String id : getKeys(snippet.getProcessGroups())) {
                destination.addProcessGroup(processGroups.remove(id));
            }
            for (final String id : getKeys(snippet.getProcessors())) {
                destination.addProcessor(processors.remove(id));
            }
            for (final String id : getKeys(snippet.getRemoteProcessGroups())) {
                destination.addRemoteProcessGroup(remoteGroups.remove(id));
            }
            for (final String id : getKeys(snippet.getConnections())) {
                destination.inheritConnection(connections.remove(id));
            }
        } finally {
            writeLock.unlock();
        }
    }

    private Set<Connectable> getAllConnectables(final Snippet snippet) {
        final Set<Connectable> connectables = new HashSet<>();
        for (final String id : getKeys(snippet.getInputPorts())) {
            connectables.add(getInputPort(id));
        }
        for (final String id : getKeys(snippet.getOutputPorts())) {
            connectables.add(getOutputPort(id));
        }
        for (final String id : getKeys(snippet.getFunnels())) {
            connectables.add(getFunnel(id));
        }
        for (final String id : getKeys(snippet.getProcessors())) {
            connectables.add(getProcessor(id));
        }
        return connectables;
    }

    private boolean isDisconnected(final Snippet snippet) {
        final Set<Connectable> connectables = getAllConnectables(snippet);

        for (final String id : getKeys(snippet.getRemoteProcessGroups())) {
            final RemoteProcessGroup remoteGroup = getRemoteProcessGroup(id);
            connectables.addAll(remoteGroup.getInputPorts());
            connectables.addAll(remoteGroup.getOutputPorts());
        }

        final Set<String> connectionIds = snippet.getConnections().keySet();
        for (final Connectable connectable : connectables) {
            for (final Connection conn : connectable.getIncomingConnections()) {
                if (!connectionIds.contains(conn.getIdentifier())) {
                    return false;
                }
            }

            for (final Connection conn : connectable.getConnections()) {
                if (!connectionIds.contains(conn.getIdentifier())) {
                    return false;
                }
            }
        }

        final Set<Connectable> recursiveConnectables = new HashSet<>(connectables);
        for (final String id : snippet.getProcessGroups().keySet()) {
            final ProcessGroup childGroup = getProcessGroup(id);
            recursiveConnectables.addAll(findAllConnectables(childGroup, true));
        }

        for (final String id : connectionIds) {
            final Connection connection = getConnection(id);
            if (!recursiveConnectables.contains(connection.getSource()) || !recursiveConnectables.contains(connection.getDestination())) {
                return false;
            }
        }

        return true;
    }

    @Override
    public Set<Positionable> findAllPositionables() {
        final Set<Positionable> positionables = new HashSet<>();
        positionables.addAll(findAllConnectables(this, true));
        List<ProcessGroup> allProcessGroups = findAllProcessGroups();
        positionables.addAll(allProcessGroups);
        positionables.addAll(findAllRemoteProcessGroups());
        positionables.addAll(findAllLabels());
        return positionables;
    }

    private Set<Connectable> findAllConnectables(final ProcessGroup group, final boolean includeRemotePorts) {
        final Set<Connectable> set = new HashSet<>();
        set.addAll(group.getInputPorts());
        set.addAll(group.getOutputPorts());
        set.addAll(group.getFunnels());
        set.addAll(group.getProcessors());
        if (includeRemotePorts) {
            for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
                set.addAll(remoteGroup.getInputPorts());
                set.addAll(remoteGroup.getOutputPorts());
            }
        }

        for (final ProcessGroup childGroup : group.getProcessGroups()) {
            set.addAll(findAllConnectables(childGroup, includeRemotePorts));
        }

        return set;
    }

    /**
     * Verifies that all ID's defined within the given snippet reference
     * components within this ProcessGroup. If this is not the case, throws
     * {@link IllegalStateException}.
     *
     * @param snippet the snippet
     * @throws NullPointerException  if the argument is null
     * @throws IllegalStateException if the snippet contains an ID that
     *                               references a component that is not part of this ProcessGroup
     */
    private void verifyContents(final Snippet snippet) throws NullPointerException, IllegalStateException {
        requireNonNull(snippet);

        verifyAllKeysExist(snippet.getInputPorts().keySet(), inputPorts, "Input Port");
        verifyAllKeysExist(snippet.getOutputPorts().keySet(), outputPorts, "Output Port");
        verifyAllKeysExist(snippet.getFunnels().keySet(), funnels, "Funnel");
        verifyAllKeysExist(snippet.getLabels().keySet(), labels, "Label");
        verifyAllKeysExist(snippet.getProcessGroups().keySet(), processGroups, "Process Group");
        verifyAllKeysExist(snippet.getProcessors().keySet(), processors, "Processor");
        verifyAllKeysExist(snippet.getRemoteProcessGroups().keySet(), remoteGroups, "Remote Process Group");
        verifyAllKeysExist(snippet.getConnections().keySet(), connections, "Connection");
    }

    /**
     * Verifies that a move request cannot attempt to move a process group into itself.
     *
     * @param snippet     the snippet
     * @param destination the destination
     * @throws IllegalStateException if the snippet contains an ID that is equal to the identifier of the destination
     */
    private void verifyDestinationNotInSnippet(final Snippet snippet, final ProcessGroup destination) throws IllegalStateException {
        if (snippet.getProcessGroups() != null && destination != null) {
            snippet.getProcessGroups().forEach((processGroupId, revision) -> {
                if (processGroupId.equals(destination.getIdentifier())) {
                    throw new IllegalStateException("Unable to move Process Group into itself.");
                }
            });
        }
    }

    /**
     * <p>
     * Verifies that all ID's specified by the given set exist as keys in the
     * given Map. If any of the ID's does not exist as a key in the map, will
     * throw {@link IllegalStateException} indicating the ID that is invalid and
     * specifying the Component Type.
     * </p>
     *
     * <p>
     * If the ids given are null, will do no validation.
     * </p>
     *
     * @param ids           ids
     * @param map           map
     * @param componentType type
     */
    private void verifyAllKeysExist(final Set<String> ids, final Map<String, ?> map, final String componentType) {
        if (ids != null) {
            for (final String id : ids) {
                if (!map.containsKey(id)) {
                    throw new IllegalStateException("ID " + id + " does not refer to a(n) " + componentType + " in this ProcessGroup");
                }
            }
        }
    }

    @Override
    public void verifyCanAddTemplate(final String name) {
        // ensure the name is specified
        if (StringUtils.isBlank(name)) {
            throw new IllegalArgumentException("Template name cannot be blank.");
        }

        for (final Template template : getRoot().findAllTemplates()) {
            final TemplateDTO existingDto = template.getDetails();

            // ensure a template with this name doesnt already exist
            if (name.equals(existingDto.getName())) {
                throw new IllegalStateException(String.format("A template named '%s' already exists.", name));
            }
        }
    }

    @Override
    public void verifyCanDelete() {
        verifyCanDelete(false);
    }

    @Override
    public void verifyCanDelete(final boolean ignorePortConnections) {
        verifyCanDelete(ignorePortConnections, false);
    }

    @Override
    public void verifyCanDelete(final boolean ignoreConnections, final boolean ignoreTemplates) {
        readLock.lock();
        try {
            for (final Port port : inputPorts.values()) {
                port.verifyCanDelete(true);
            }

            for (final Port port : outputPorts.values()) {
                port.verifyCanDelete(true);
            }

            for (final ProcessorNode procNode : processors.values()) {
                procNode.verifyCanDelete(true);
            }

            for (final Connection connection : connections.values()) {
                connection.verifyCanDelete();
            }

            for (final ControllerServiceNode cs : controllerServices.values()) {
                cs.verifyCanDelete();
            }

            for (final ProcessGroup childGroup : processGroups.values()) {
                // For nested child groups we can ignore the input/output port
                // connections as they will be being deleted anyway.
                childGroup.verifyCanDelete(true, ignoreTemplates);
            }

            if (!ignoreTemplates && !templates.isEmpty()) {
                throw new IllegalStateException(String.format("Cannot delete Process Group because it contains %s Templates. The Templates must be deleted first.", templates.size()));
            }

            if (!ignoreConnections) {
                for (final Port port : inputPorts.values()) {
                    for (final Connection connection : port.getIncomingConnections()) {
                        if (connection.getSource().equals(port)) {
                            connection.verifyCanDelete();
                        } else {
                            throw new IllegalStateException("Cannot delete Process Group because Input Port " + port.getIdentifier()
                                    + " has at least one incoming connection from a component outside of the Process Group. Delete this connection first.");
                        }
                    }
                }

                for (final Port port : outputPorts.values()) {
                    for (final Connection connection : port.getConnections()) {
                        if (connection.getDestination().equals(port)) {
                            connection.verifyCanDelete();
                        } else {
                            throw new IllegalStateException("Cannot delete Process Group because Output Port " + port.getIdentifier()
                                    + " has at least one outgoing connection to a component outside of the Process Group. Delete this connection first.");
                        }
                    }
                }
            }
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void verifyCanStop(Connectable connectable) {
        final ScheduledState state = connectable.getScheduledState();
        if (state == ScheduledState.DISABLED) {
            throw new IllegalStateException("Cannot stop component with id " + connectable + " because it is currently disabled.");
        }
    }

    @Override
    public void verifyCanStop() {
    }

    @Override
    public void verifyCanStart(Connectable connectable) {
        readLock.lock();
        try {
            if (connectable.getScheduledState() == ScheduledState.STOPPED) {
                if (scheduler.getActiveThreadCount(connectable) > 0) {
                    throw new IllegalStateException("Cannot start component with id" + connectable.getIdentifier() + " because it is currently stopping");
                }

                connectable.verifyCanStart();
            }
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void verifyCanStart() {
        readLock.lock();
        try {
            for (final Connectable connectable : findAllConnectables(this, false)) {
                verifyCanStart(connectable);
            }
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void verifyCanDelete(final Snippet snippet) throws IllegalStateException {
        readLock.lock();
        try {
            if (!id.equals(snippet.getParentGroupId())) {
                throw new IllegalStateException("Snippet belongs to ProcessGroup with ID " + snippet.getParentGroupId() + " but this ProcessGroup has id " + id);
            }

            if (!isDisconnected(snippet)) {
                throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
            }

            for (final String id : snippet.getConnections().keySet()) {
                final Connection connection = getConnection(id);
                if (connection == null) {
                    throw new IllegalStateException("Snippet references Connection with ID " + id + ", which does not exist in this ProcessGroup");
                }

                connection.verifyCanDelete();
            }

            for (final String id : snippet.getFunnels().keySet()) {
                final Funnel funnel = getFunnel(id);
                if (funnel == null) {
                    throw new IllegalStateException("Snippet references Funnel with ID " + id + ", which does not exist in this ProcessGroup");
                }

                funnel.verifyCanDelete(true);
            }

            for (final String id : snippet.getInputPorts().keySet()) {
                final Port port = getInputPort(id);
                if (port == null) {
                    throw new IllegalStateException("Snippet references Input Port with ID " + id + ", which does not exist in this ProcessGroup");
                }

                port.verifyCanDelete(true);
            }

            for (final String id : snippet.getLabels().keySet()) {
                final Label label = getLabel(id);
                if (label == null) {
                    throw new IllegalStateException("Snippet references Label with ID " + id + ", which does not exist in this ProcessGroup");
                }
            }

            for (final String id : snippet.getOutputPorts().keySet()) {
                final Port port = getOutputPort(id);
                if (port == null) {
                    throw new IllegalStateException("Snippet references Output Port with ID " + id + ", which does not exist in this ProcessGroup");
                }
                port.verifyCanDelete(true);
            }

            for (final String id : snippet.getProcessGroups().keySet()) {
                final ProcessGroup group = getProcessGroup(id);
                if (group == null) {
                    throw new IllegalStateException("Snippet references Process Group with ID " + id + ", which does not exist in this ProcessGroup");
                }
                group.verifyCanDelete(true);
            }

            for (final String id : snippet.getProcessors().keySet()) {
                final ProcessorNode processor = getProcessor(id);
                if (processor == null) {
                    throw new IllegalStateException("Snippet references Processor with ID " + id + ", which does not exist in this ProcessGroup");
                }
                processor.verifyCanDelete(true);
            }

            for (final String id : snippet.getRemoteProcessGroups().keySet()) {
                final RemoteProcessGroup group = getRemoteProcessGroup(id);
                if (group == null) {
                    throw new IllegalStateException("Snippet references Remote Process Group with ID " + id + ", which does not exist in this ProcessGroup");
                }
                group.verifyCanDelete(true);
            }
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void verifyCanMove(final Snippet snippet, final ProcessGroup newProcessGroup) throws IllegalStateException {
        readLock.lock();
        try {
            if (!id.equals(snippet.getParentGroupId())) {
                throw new IllegalStateException("Snippet belongs to ProcessGroup with ID " + snippet.getParentGroupId() + " but this ProcessGroup has id " + id);
            }

            verifyContents(snippet);
            verifyDestinationNotInSnippet(snippet, newProcessGroup);

            if (!isDisconnected(snippet)) {
                throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved.");
            }

            for (final String id : snippet.getInputPorts().keySet()) {
                final Port port = getInputPort(id);
                final String portName = port.getName();

                if (newProcessGroup.getInputPortByName(portName) != null) {
                    throw new IllegalStateException("Cannot perform Move Operation because of a naming conflict with another port in the destination Process Group");
                }
            }

            for (final String id : snippet.getOutputPorts().keySet()) {
                final Port port = getOutputPort(id);
                final String portName = port.getName();

                if (newProcessGroup.getOutputPortByName(portName) != null) {
                    throw new IllegalStateException("Cannot perform Move Operation because of a naming conflict with another port in the destination Process Group");
                }
            }

            final ParameterContext currentParameterContext = getParameterContext();
            final String currentParameterContextId = currentParameterContext == null ? null : currentParameterContext.getIdentifier();
            final ParameterContext destinationParameterContext = newProcessGroup.getParameterContext();
            final String destinationParameterContextId = destinationParameterContext == null ? null : destinationParameterContext.getIdentifier();

            final boolean parameterContextsDiffer = !Objects.equals(currentParameterContextId, destinationParameterContextId);

            final Set<ProcessorNode> processors = findAllProcessors(snippet);
            for (final ProcessorNode processorNode : processors) {
                for (final PropertyDescriptor descriptor : processorNode.getProperties().keySet()) {
                    final Class<? extends ControllerService> serviceDefinition = descriptor.getControllerServiceDefinition();

                    // if this descriptor identifies a controller service
                    if (serviceDefinition != null) {
                        final String serviceId = processorNode.getEffectivePropertyValue(descriptor);

                        // if the processor is configured with a service
                        if (serviceId != null) {
                            // get all the available services
                            final Set<String> currentControllerServiceIds = controllerServiceProvider.getControllerServiceIdentifiers(serviceDefinition, getIdentifier());
                            final Set<String> proposedControllerServiceIds = controllerServiceProvider.getControllerServiceIdentifiers(serviceDefinition, newProcessGroup.getIdentifier());

                            // ensure the configured service is an allowed service if it's still a valid service
                            if (currentControllerServiceIds.contains(serviceId) && !proposedControllerServiceIds.contains(serviceId)) {
                                throw new IllegalStateException("Cannot perform Move Operation because Processor with ID " + processorNode.getIdentifier()
                                        + " references a service that is not available in the destination Process Group");
                            }
                        }
                    }

                    // If Parameter is used and the Parameter Contexts are different, then the Processor must be stopped.
                    if (parameterContextsDiffer && processorNode.isRunning() && processorNode.isReferencingParameter()) {
                        throw new IllegalStateException("Cannot perform Move Operation because Processor with ID " + processorNode.getIdentifier() + " references one or more Parameters, and the " +
                            "Processor is running, and the destination Process Group is bound to a different Parameter Context that the current Process Group. This would result in changing the " +
                            "configuration of the Processor while it is running, which is not allowed. You must first stop the Processor before moving it to another Process Group if the " +
                            "destination's Parameter Context is not the same.");
                    }
                }
            }
        } finally {
            readLock.unlock();
        }
    }

    private Set<ProcessorNode> findAllProcessors(final Snippet snippet) {
        final Set<ProcessorNode> processors = new HashSet<>();

        snippet.getProcessors().keySet().stream()
                .map(this::getProcessor)
                .forEach(processors::add);

        for (final String groupId : snippet.getProcessGroups().keySet()) {
            processors.addAll(getProcessGroup(groupId).findAllProcessors());
        }

        return processors;
    }

    @Override
    public ParameterContext getParameterContext() {
        return parameterContext;
    }

    @Override
    public void setParameterContext(final ParameterContext parameterContext) {
        verifyCanSetParameterContext(parameterContext);

        // Determine which parameters have changed so that components can be appropriately updated.
        final Map<String, ParameterUpdate> updatedParameters = mapParameterUpdates(this.parameterContext, parameterContext);
        LOG.debug("Parameter Context for {} changed from {} to {}. This resulted in {} Parameter Updates ({}). Notifying Processors/Controller Services of the updates.",
            this, this.parameterContext, parameterContext, updatedParameters.size(), updatedParameters);

        this.parameterContext = parameterContext;

        if (!updatedParameters.isEmpty()) {
            // Notify components that parameters have been updated
            onParameterContextUpdated(updatedParameters);
        }
    }

    @Override
    public void onParameterContextUpdated(final Map<String, ParameterUpdate> updatedParameters) {
        readLock.lock();
        try {
            getProcessors().forEach(proc -> proc.onParametersModified(updatedParameters));
            getControllerServices(false).forEach(cs -> cs.onParametersModified(updatedParameters));
        } finally {
            readLock.unlock();
        }
    }

    private Map<String, ParameterUpdate> mapParameterUpdates(final ParameterContext previousParameterContext, final ParameterContext updatedParameterContext) {
        if (previousParameterContext == null && updatedParameterContext == null) {
            return Collections.emptyMap();
        }
        if (updatedParameterContext == null) {
            return createParameterUpdates(previousParameterContext, (descriptor, value) -> new StandardParameterUpdate(descriptor.getName(), value, null, descriptor.isSensitive()));
        }
        if (previousParameterContext == null) {
            return createParameterUpdates(updatedParameterContext, (descriptor, value) -> new StandardParameterUpdate(descriptor.getName(), null, value, descriptor.isSensitive()));
        }

        // For each Parameter in the updated parameter context, add a ParameterUpdate to our map
        final Map<String, ParameterUpdate> updatedParameters = new HashMap<>();
        for (final Map.Entry<ParameterDescriptor, Parameter> entry : updatedParameterContext.getParameters().entrySet()) {
            final ParameterDescriptor updatedDescriptor = entry.getKey();
            final Parameter updatedParameter = entry.getValue();

            final Optional<Parameter> previousParameterOption = previousParameterContext.getParameter(updatedDescriptor);
            final String previousValue = previousParameterOption.map(Parameter::getValue).orElse(null);
            final String updatedValue = updatedParameter.getValue();

            if (!Objects.equals(previousValue, updatedValue)) {
                final ParameterUpdate parameterUpdate = new StandardParameterUpdate(updatedDescriptor.getName(), previousValue, updatedValue, updatedDescriptor.isSensitive());
                updatedParameters.put(updatedDescriptor.getName(), parameterUpdate);
            }
        }

        // For each Parameter that was in the previous parameter context that is not in the updated Paramter Context, add a ParameterUpdate to our map with `null` for the updated value
        for (final Map.Entry<ParameterDescriptor, Parameter> entry : previousParameterContext.getParameters().entrySet()) {
            final ParameterDescriptor previousDescriptor = entry.getKey();
            final Parameter previousParameter = entry.getValue();

            final Optional<Parameter> updatedParameterOption = updatedParameterContext.getParameter(previousDescriptor);
            if (updatedParameterOption.isPresent()) {
                // The value exists in both Parameter Contexts. If it was changed, a Parameter Update has already been added to the map, above.
                continue;
            }

            final ParameterUpdate parameterUpdate = new StandardParameterUpdate(previousDescriptor.getName(), previousParameter.getValue(), null, previousDescriptor.isSensitive());
            updatedParameters.put(previousDescriptor.getName(), parameterUpdate);
        }

        return updatedParameters;
    }

    private Map<String, ParameterUpdate> createParameterUpdates(final ParameterContext parameterContext, final BiFunction<ParameterDescriptor, String, ParameterUpdate> parameterUpdateMapper) {
        final Map<String, ParameterUpdate> updatedParameters = new HashMap<>();

        for (final Map.Entry<ParameterDescriptor, Parameter> entry : parameterContext.getParameters().entrySet()) {
            final ParameterDescriptor parameterDescriptor = entry.getKey();
            final Parameter parameter = entry.getValue();

            final ParameterUpdate parameterUpdate = parameterUpdateMapper.apply(parameterDescriptor, parameter.getValue());
            updatedParameters.put(parameterDescriptor.getName(), parameterUpdate);
        }

        return updatedParameters;
    }


    @Override
    public void verifyCanSetParameterContext(final ParameterContext parameterContext) {
        readLock.lock();
        try {
            if (Objects.equals(parameterContext, getParameterContext())) {
                return;
            }

            for (final ProcessorNode processor : processors.values()) {
                final boolean referencingParam = processor.isReferencingParameter();
                if (!referencingParam) {
                    continue;
                }

                if (processor.isRunning()) {
                    throw new IllegalStateException("Cannot change Parameter Context for " + this + " because " + processor + " is referencing at least one Parameter and is running");
                }

                verifyParameterSensitivityIsValid(processor, parameterContext);
            }

            for (final ControllerServiceNode service : controllerServices.values()) {
                final boolean referencingParam = service.isReferencingParameter();
                if (!referencingParam) {
                    continue;
                }

                if (service.getState() != ControllerServiceState.DISABLED) {
                    throw new IllegalStateException("Cannot change Parameter Context for " + this + " because " + service + " is referencing at least one Parameter and is not disabled");
                }

                verifyParameterSensitivityIsValid(service, parameterContext);
            }
        } finally {
            readLock.unlock();
        }
    }

    private void verifyParameterSensitivityIsValid(final ComponentNode component, final ParameterContext parameterContext) {
        if (parameterContext == null) {
            return;
        }

        final Map<PropertyDescriptor, PropertyConfiguration> properties = component.getProperties();
        for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry : properties.entrySet()) {
            final PropertyConfiguration configuration = entry.getValue();
            if (configuration == null) {
                continue;
            }

            for (final ParameterReference reference : configuration.getParameterReferences()) {
                final String paramName = reference.getParameterName();
                final Optional<Parameter> parameter = parameterContext.getParameter(paramName);

                if (parameter.isPresent()) {
                    final PropertyDescriptor propertyDescriptor = entry.getKey();
                    if (parameter.get().getDescriptor().isSensitive() && !propertyDescriptor.isSensitive()) {
                        throw new IllegalStateException("Cannot change Parameter Context for " + this + " because " + component + " is referencing Parameter '" + paramName
                            + "' from the '" + propertyDescriptor.getDisplayName() + "' property and the Parameter is sensitive. Sensitive Parameters may only be referenced " +
                            "by sensitive properties.");
                    }

                    if (!parameter.get().getDescriptor().isSensitive() && propertyDescriptor.isSensitive()) {
                        throw new IllegalStateException("Cannot change Parameter Context for " + this + " because " + component + " is referencing Parameter '" + paramName
                            + "' from a sensitive property and the Parameter is not sensitive. Sensitive properties may only reference " +
                            "by Sensitive Parameters.");
                    }
                }
            }
        }
    }

    @Override
    public MutableVariableRegistry getVariableRegistry() {
        return variableRegistry;
    }

    @Override
    public void verifyCanUpdateVariables(final Map<String, String> updatedVariables) {
        if (updatedVariables == null || updatedVariables.isEmpty()) {
            return;
        }

        readLock.lock();
        try {
            final Set<String> updatedVariableNames = getUpdatedVariables(updatedVariables);
            if (updatedVariableNames.isEmpty()) {
                return;
            }

            // Determine any Processors that references the variable
            for (final ProcessorNode processor : getProcessors()) {
                if (!processor.isRunning()) {
                    continue;
                }

                for (final VariableImpact impact : getVariableImpact(processor)) {
                    for (final String variableName : updatedVariableNames) {
                        if (impact.isImpacted(variableName)) {
                            throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + processor + ", which is currently running");
                        }
                    }
                }
            }

            // Determine any Controller Service that references the variable.
            for (final ControllerServiceNode service : getControllerServices(false)) {
                if (!service.isActive()) {
                    continue;
                }

                for (final VariableImpact impact : getVariableImpact(service)) {
                    for (final String variableName : updatedVariableNames) {
                        if (impact.isImpacted(variableName)) {
                            throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + service + ", which is currently running");
                        }
                    }
                }
            }

            // For any child Process Group that does not override the variable, also include its references.
            // If a child group has a value for the same variable, though, then that means that the child group
            // is overriding the variable and its components are actually referencing a different variable.
            for (final ProcessGroup childGroup : getProcessGroups()) {
                for (final String variableName : updatedVariableNames) {
                    final ComponentVariableRegistry childRegistry = childGroup.getVariableRegistry();
                    final VariableDescriptor descriptor = childRegistry.getVariableKey(variableName);
                    final boolean overridden = childRegistry.getVariableMap().containsKey(descriptor);
                    if (!overridden) {
                        final Set<ComponentNode> affectedComponents = childGroup.getComponentsAffectedByVariable(variableName);

                        for (final ComponentNode affectedComponent : affectedComponents) {
                            if (affectedComponent instanceof ProcessorNode) {
                                final ProcessorNode affectedProcessor = (ProcessorNode) affectedComponent;
                                if (affectedProcessor.isRunning()) {
                                    throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponent + ", which is currently running.");
                                }
                            } else if (affectedComponent instanceof ControllerServiceNode) {
                                final ControllerServiceNode affectedService = (ControllerServiceNode) affectedComponent;
                                if (affectedService.isActive()) {
                                    throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponent + ", which is currently active.");
                                }
                            } else if (affectedComponent instanceof ReportingTaskNode) {
                                final ReportingTaskNode affectedReportingTask = (ReportingTaskNode) affectedComponent;
                                if (affectedReportingTask.isRunning()) {
                                    throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponent + ", which is currently running.");
                                }
                            }
                        }
                    }
                }
            }
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public Optional<String> getVersionedComponentId() {
        return Optional.ofNullable(versionedComponentId.get());
    }

    @Override
    public void setVersionedComponentId(final String componentId) {
        writeLock.lock();
        try {
            final String currentId = versionedComponentId.get();

            if (currentId == null) {
                versionedComponentId.set(componentId);
            } else if (currentId.equals(componentId)) {
                return;
            } else if (componentId == null) {
                versionedComponentId.set(null);
            } else {
                throw new IllegalStateException(this + " is already under version control with a different Versioned Component ID");
            }
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Set<ComponentNode> getComponentsAffectedByVariable(final String variableName) {
        final Set<ComponentNode> affected = new HashSet<>();

        // Determine any Processors that references the variable
        for (final ProcessorNode processor : getProcessors()) {
            for (final VariableImpact impact : getVariableImpact(processor)) {
                if (impact.isImpacted(variableName)) {
                    affected.add(processor);
                }
            }
        }

        // Determine any Controller Service that references the variable. If Service A references a variable,
        // then that means that any other component that references that service is also affected, so recursively
        // find any references to that service and add it.
        for (final ControllerServiceNode service : getControllerServices(false)) {
            for (final VariableImpact impact : getVariableImpact(service)) {
                if (impact.isImpacted(variableName)) {
                    affected.add(service);

                    final ControllerServiceReference reference = service.getReferences();
                    affected.addAll(reference.findRecursiveReferences(ComponentNode.class));
                }
            }
        }

        // For any child Process Group that does not override the variable, also include its references.
        // If a child group has a value for the same variable, though, then that means that the child group
        // is overriding the variable and its components are actually referencing a different variable.
        for (final ProcessGroup childGroup : getProcessGroups()) {
            final ComponentVariableRegistry childRegistry = childGroup.getVariableRegistry();
            final VariableDescriptor descriptor = childRegistry.getVariableKey(variableName);
            final boolean overridden = childRegistry.getVariableMap().containsKey(descriptor);
            if (!overridden) {
                affected.addAll(childGroup.getComponentsAffectedByVariable(variableName));
            }
        }

        return affected;
    }


    private Set<String> getUpdatedVariables(final Map<String, String> newVariableValues) {
        final Set<String> updatedVariableNames = new HashSet<>();

        final MutableVariableRegistry registry = getVariableRegistry();
        for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) {
            final String varName = entry.getKey();
            final String newValue = entry.getValue();

            final String curValue = registry.getVariableValue(varName);
            if (!Objects.equals(newValue, curValue)) {
                updatedVariableNames.add(varName);
            }
        }

        return updatedVariableNames;
    }

    private List<VariableImpact> getVariableImpact(final ComponentNode component) {
        return component.getEffectivePropertyValues().keySet().stream()
                .map(descriptor -> {
                    final String configuredVal = component.getEffectivePropertyValue(descriptor);
                    return configuredVal == null ? descriptor.getDefaultValue() : configuredVal;
                })
                .map(propVal -> Query.prepare(propVal).getVariableImpact())
                .collect(Collectors.toList());
    }

    @Override
    public void setVariables(final Map<String, String> variables) {
        writeLock.lock();
        try {
            verifyCanUpdateVariables(variables);

            if (variables == null) {
                return;
            }

            final Map<VariableDescriptor, String> variableMap = new HashMap<>();
            // cannot use Collectors.toMap because value may be null
            variables.forEach((key, value) -> variableMap.put(new VariableDescriptor(key), value));

            variableRegistry.setVariables(variableMap);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public VersionControlInformation getVersionControlInformation() {
        return versionControlInfo.get();
    }

    @Override
    public void onComponentModified() {
        // We no longer know if or how the Process Group has changed, so the next time that we
        // get the local modifications, we must re-calculate it. We cannot simply assume that
        // the flow was modified now, because if a Processor Property changed from 'A' to 'B',
        // then back to 'A', then we have to know that it was not modified. So we set it to null
        // to indicate that we must calculate the local modifications.
        final StandardVersionControlInformation svci = this.versionControlInfo.get();
        if (svci == null) {
            // This group is not under version control directly. Notify parent.
            final ProcessGroup parentGroup = parent.get();
            if (parentGroup != null) {
                parentGroup.onComponentModified();
            }
        }

        versionControlFields.setFlowDifferences(null);
    }


    @Override
    public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map<String, String> versionedComponentIds) {
        final StandardVersionControlInformation svci = new StandardVersionControlInformation(
                versionControlInformation.getRegistryIdentifier(),
                versionControlInformation.getRegistryName(),
                versionControlInformation.getBucketIdentifier(),
                versionControlInformation.getFlowIdentifier(),
                versionControlInformation.getVersion(),
                stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true),
                versionControlInformation.getStatus()) {

            @Override
            public String getRegistryName() {
                final String registryId = versionControlInformation.getRegistryIdentifier();
                final FlowRegistry registry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
                return registry == null ? registryId : registry.getName();
            }

            private boolean isModified() {
                if (versionControlInformation.getVersion() == 0) {
                    return true;
                }

                Set<FlowDifference> differences = versionControlFields.getFlowDifferences();
                if (differences == null) {
                    differences = getModifications();
                    if (differences == null) {
                        return false;
                    }

                    versionControlFields.setFlowDifferences(differences);
                }

                return !differences.isEmpty();
            }

            @Override
            public VersionedFlowStatus getStatus() {
                // If current state is a sync failure, then
                final String syncFailureExplanation = versionControlFields.getSyncFailureExplanation();
                if (syncFailureExplanation != null) {
                    return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, syncFailureExplanation);
                }

                try {
                    final boolean modified = isModified();
                    if (!modified) {
                        final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get();
                        if (vci.getFlowSnapshot() == null) {
                            return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry");
                        }
                    }

                    final boolean stale = versionControlFields.isStale();

                    final VersionedFlowState flowState;
                    if (modified && stale) {
                        flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
                    } else if (modified) {
                        flowState = VersionedFlowState.LOCALLY_MODIFIED;
                    } else if (stale) {
                        flowState = VersionedFlowState.STALE;
                    } else {
                        flowState = VersionedFlowState.UP_TO_DATE;
                    }

                    return new StandardVersionedFlowStatus(flowState, flowState.getDescription());
                } catch (final Exception e) {
                    LOG.warn("Could not correctly determine Versioned Flow Status for {}. Will consider state to be SYNC_FAILURE", this, e);
                    return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Could not properly determine flow status due to: " + e);
                }
            }
        };

        svci.setBucketName(versionControlInformation.getBucketName());
        svci.setFlowName(versionControlInformation.getFlowName());
        svci.setFlowDescription(versionControlInformation.getFlowDescription());

        final VersionedFlowState flowState = versionControlInformation.getStatus().getState();
        versionControlFields.setStale(flowState == VersionedFlowState.STALE || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE);
        versionControlFields.setLocallyModified(flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE);
        versionControlFields.setSyncFailureExplanation(flowState == VersionedFlowState.SYNC_FAILURE ? versionControlInformation.getStatus().getStateExplanation() : null);

        writeLock.lock();
        try {
            updateVersionedComponentIds(this, versionedComponentIds);
            this.versionControlInfo.set(svci);
            versionControlFields.setFlowDifferences(null);

            final ProcessGroup parent = getParent();
            if (parent != null) {
                parent.onComponentModified();
            }

            scheduler.submitFrameworkTask(() -> synchronizeWithFlowRegistry(flowController.getFlowRegistryClient()));
        } finally {
            writeLock.unlock();
        }
    }

    private VersionedProcessGroup stripContentsFromRemoteDescendantGroups(final VersionedProcessGroup processGroup, final boolean topLevel) {
        if (processGroup == null) {
            return null;
        }

        final VersionedProcessGroup copy = new VersionedProcessGroup();
        copy.setComments(processGroup.getComments());
        copy.setComponentType(processGroup.getComponentType());
        copy.setGroupIdentifier(processGroup.getGroupIdentifier());
        copy.setIdentifier(processGroup.getIdentifier());
        copy.setName(processGroup.getName());
        copy.setPosition(processGroup.getPosition());
        copy.setVersionedFlowCoordinates(topLevel ? null : processGroup.getVersionedFlowCoordinates());
        copy.setConnections(processGroup.getConnections());
        copy.setControllerServices(processGroup.getControllerServices());
        copy.setFunnels(processGroup.getFunnels());
        copy.setInputPorts(processGroup.getInputPorts());
        copy.setOutputPorts(processGroup.getOutputPorts());
        copy.setProcessors(processGroup.getProcessors());
        copy.setRemoteProcessGroups(processGroup.getRemoteProcessGroups());
        copy.setVariables(processGroup.getVariables());
        copy.setLabels(processGroup.getLabels());

        final Set<VersionedProcessGroup> copyChildren = new HashSet<>();

        for (final VersionedProcessGroup childGroup : processGroup.getProcessGroups()) {
            if (childGroup.getVersionedFlowCoordinates() == null) {
                copyChildren.add(stripContentsFromRemoteDescendantGroups(childGroup, false));
            } else {
                final VersionedProcessGroup childCopy = new VersionedProcessGroup();
                childCopy.setComments(childGroup.getComments());
                childCopy.setComponentType(childGroup.getComponentType());
                childCopy.setGroupIdentifier(childGroup.getGroupIdentifier());
                childCopy.setIdentifier(childGroup.getIdentifier());
                childCopy.setName(childGroup.getName());
                childCopy.setPosition(childGroup.getPosition());
                childCopy.setVersionedFlowCoordinates(childGroup.getVersionedFlowCoordinates());

                copyChildren.add(childCopy);
            }
        }

        copy.setProcessGroups(copyChildren);
        return copy;
    }

    @Override
    public void disconnectVersionControl(final boolean removeVersionedComponentIds) {
        writeLock.lock();
        try {
            this.versionControlInfo.set(null);

            if (removeVersionedComponentIds) {
                // remove version component ids from each component (until another versioned PG is encountered)
                applyVersionedComponentIds(this, id -> null);
            }
        } finally {
            writeLock.unlock();
        }
    }

    private void updateVersionedComponentIds(final ProcessGroup processGroup, final Map<String, String> versionedComponentIds) {
        if (versionedComponentIds == null || versionedComponentIds.isEmpty()) {
            return;
        }

        applyVersionedComponentIds(processGroup, versionedComponentIds::get);

        // If we versioned any parent groups' Controller Services, set their versioned component id's too.
        final ProcessGroup parent = processGroup.getParent();
        if (parent != null) {
            for (final ControllerServiceNode service : parent.getControllerServices(true)) {
                if (!service.getVersionedComponentId().isPresent()) {
                    final String versionedId = versionedComponentIds.get(service.getIdentifier());
                    if (versionedId != null) {
                        service.setVersionedComponentId(versionedId);
                    }
                }
            }
        }
    }


    private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function<String, String> lookup) {
        processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier()));

        processGroup.getConnections()
                .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
        processGroup.getProcessors()
                .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
        processGroup.getInputPorts()
                .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
        processGroup.getOutputPorts()
                .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
        processGroup.getLabels()
                .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
        processGroup.getFunnels()
                .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
        processGroup.getControllerServices(false)
                .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));

        processGroup.getRemoteProcessGroups()
                .forEach(rpg -> {
                    rpg.setVersionedComponentId(lookup.apply(rpg.getIdentifier()));

                    rpg.getInputPorts().forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
                    rpg.getOutputPorts().forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
                });

        for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
            if (childGroup.getVersionControlInformation() == null) {
                applyVersionedComponentIds(childGroup, lookup);
            } else if (!childGroup.getVersionedComponentId().isPresent()) {
                childGroup.setVersionedComponentId(lookup.apply(childGroup.getIdentifier()));
            }
        }
    }


    @Override
    public void synchronizeWithFlowRegistry(final FlowRegistryClient flowRegistryClient) {
        final StandardVersionControlInformation vci = versionControlInfo.get();
        if (vci == null) {
            return;
        }

        final String registryId = vci.getRegistryIdentifier();
        final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
        if (flowRegistry == null) {
            final String message = String.format("Unable to synchronize Process Group with Flow Registry because Process Group was placed under Version Control using Flow Registry "
                    + "with identifier %s but cannot find any Flow Registry with this identifier", registryId);
            versionControlFields.setSyncFailureExplanation(message);

            LOG.error("Unable to synchronize {} with Flow Registry because Process Group was placed under Version Control using Flow Registry "
                    + "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId);
            return;
        }

        final VersionedProcessGroup snapshot = vci.getFlowSnapshot();
        if (snapshot == null && vci.getVersion() > 0) {
            // We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry.
            // This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry.
            try {
                final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion(), false);
                final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents();
                vci.setFlowSnapshot(registryFlow);
            } catch (final IOException | NiFiRegistryException e) {
                final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not retrieve version %s of flow with identifier %s in bucket %s",
                        vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier());
                versionControlFields.setSyncFailureExplanation(message);

                final String logErrorMessage = "Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}";

                // No need to print a full stacktrace for connection refused
                if (e instanceof ConnectException) {
                    LOG.error(logErrorMessage + " due to: {}", this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e.getLocalizedMessage());
                } else {
                    LOG.error(logErrorMessage, this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e);
                }
                return;
            }
        }

        try {
            final VersionedFlow versionedFlow = flowRegistry.getVersionedFlow(vci.getBucketIdentifier(), vci.getFlowIdentifier());
            final int latestVersion = (int) versionedFlow.getVersionCount();
            vci.setBucketName(versionedFlow.getBucketName());
            vci.setFlowName(versionedFlow.getName());
            vci.setFlowDescription(versionedFlow.getDescription());
            vci.setRegistryName(flowRegistry.getName());

            if (latestVersion == vci.getVersion()) {
                versionControlFields.setStale(false);
                if (latestVersion == 0) {
                    LOG.debug("{} does not have any version in the Registry", this);
                    versionControlFields.setLocallyModified(true);
                } else {
                    LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion);
                }
            } else {
                LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}",
                        this, vci.getVersion(), latestVersion);
                versionControlFields.setStale(true);
            }

            versionControlFields.setSyncFailureExplanation(null);
        } catch (final IOException | NiFiRegistryException e) {
            final String message = String.format("Failed to synchronize Process Group with Flow Registry : " + e.getMessage());
            versionControlFields.setSyncFailureExplanation(message);

            LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e);
        }
    }


    @Override
    public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings,
                           final boolean updateDescendantVersionedFlows) {
        writeLock.lock();
        try {
            verifyCanUpdate(proposedSnapshot, true, verifyNotDirty);

            final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
            final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), true);

            final ComparableDataFlow localFlow = new StandardComparableDataFlow("Current Flow", versionedGroup);
            final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("New Flow", proposedSnapshot.getFlowContents());

            final FlowComparator flowComparator = new StandardFlowComparator(proposedFlow, localFlow, getAncestorServiceIds(), new StaticDifferenceDescriptor());
            final FlowComparison flowComparison = flowComparator.compare();

            final Set<String> updatedVersionedComponentIds = new HashSet<>();
            for (final FlowDifference diff : flowComparison.getDifferences()) {
                // Ignore these as local differences for now because we can't do anything with it
                if (diff.getDifferenceType() == DifferenceType.BUNDLE_CHANGED) {
                    continue;
                }

                // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
                // and if so compare our VersionedControllerService to the existing service.
                if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
                    final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
                    if (ComponentType.CONTROLLER_SERVICE == component.getComponentType()) {
                        final ControllerServiceNode serviceNode = getVersionedControllerService(this, component.getIdentifier());
                        if (serviceNode != null) {
                            final VersionedControllerService versionedService = mapper.mapControllerService(serviceNode, controllerServiceProvider,
                                Collections.singleton(serviceNode.getProcessGroupIdentifier()), new HashMap<>());
                            final Set<FlowDifference> differences = flowComparator.compareControllerServices(versionedService, (VersionedControllerService) component);

                            if (!differences.isEmpty()) {
                                updatedVersionedComponentIds.add(component.getIdentifier());
                            }

                            continue;
                        }
                    }
                }

                final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
                updatedVersionedComponentIds.add(component.getIdentifier());

                if (component.getComponentType() == ComponentType.REMOTE_INPUT_PORT || component.getComponentType() == ComponentType.REMOTE_OUTPUT_PORT) {
                    final String remoteGroupId = ((VersionedRemoteGroupPort) component).getRemoteGroupId();
                    updatedVersionedComponentIds.add(remoteGroupId);
                }
            }

            if (LOG.isInfoEnabled()) {
                final String differencesByLine = flowComparison.getDifferences().stream()
                        .map(FlowDifference::toString)
                        .collect(Collectors.joining("\n"));

                // TODO: Until we move to NiFi Registry 0.6.0, avoid using proposedSnapshot.toString() because it throws a NullPointerException
                final String proposedSnapshotDetails = "VersionedFlowSnapshot[flowContentsId=" + proposedSnapshot.getFlowContents().getIdentifier()
                        + ", flowContentsName=" + proposedSnapshot.getFlowContents().getName() + ", NoMetadataAvailable]";
                LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshotDetails,
                        flowComparison.getDifferences().size(), differencesByLine);
            }

            final Set<String> knownVariables = getKnownVariableNames();

            final StandardVersionControlInformation originalVci = this.versionControlInfo.get();
            try {
                updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables,
                    proposedSnapshot.getParameterContexts());
            } catch (final Throwable t) {
                // The proposed snapshot may not have any Versioned Flow Coordinates. As a result, the call to #updateProcessGroup may
                // set this PG's Version Control Info to null. During the normal flow of control,
                // the Version Control Information is set appropriately at the end. However, if an Exception is thrown, we need to ensure
                // that we don't leave the Version Control Info as null. It's also important to note here that the Atomic Reference is used
                // as a means of retrieving the value without obtaining a read lock, but the value is never updated outside of a write lock.
                // As a result, it is safe to use the get() and then the set() methods of the AtomicReference without introducing the 'check-then-modify' problem.
                if (this.versionControlInfo.get() == null) {
                    this.versionControlInfo.set(originalVci);
                }

                throw t;
            }
        } catch (final ProcessorInstantiationException pie) {
            throw new IllegalStateException("Failed to update flow", pie);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Set<String> getAncestorServiceIds() {
        final Set<String> ancestorServiceIds;
        ProcessGroup parentGroup = getParent();

        if (parentGroup == null) {
            ancestorServiceIds = Collections.emptySet();
        } else {
            // We want to map the Controller Service to its Versioned Component ID, if it has one.
            // If it does not have one, we want to generate it in the same way that our Flow Mapper does
            // because this allows us to find the Controller Service when doing a Flow Diff.
            ancestorServiceIds = parentGroup.getControllerServices(true).stream()
                    .map(cs -> cs.getVersionedComponentId().orElse(
                            NiFiRegistryFlowMapper.generateVersionedComponentId(cs.getIdentifier())))
                    .collect(Collectors.toSet());
        }

        return ancestorServiceIds;
    }

    private ControllerServiceNode getVersionedControllerService(final ProcessGroup group, final String versionedComponentId) {
        if (group == null) {
            return null;
        }

        for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
            final String serviceNodeVersionedComponentId = serviceNode.getVersionedComponentId().orElse(
                    NiFiRegistryFlowMapper.generateVersionedComponentId(serviceNode.getIdentifier()));
            if (serviceNodeVersionedComponentId.equals(versionedComponentId)) {
                return serviceNode;
            }
        }

        return getVersionedControllerService(group.getParent(), versionedComponentId);
    }

    private Set<String> getKnownVariableNames() {
        final Set<String> variableNames = new HashSet<>();
        populateKnownVariableNames(this, variableNames);
        return variableNames;
    }

    private void populateKnownVariableNames(final ProcessGroup group, final Set<String> knownVariables) {
        group.getVariableRegistry().getVariableMap().keySet().stream()
                .map(VariableDescriptor::getName)
                .forEach(knownVariables::add);

        final ProcessGroup parent = group.getParent();
        if (parent != null) {
            populateKnownVariableNames(parent, knownVariables);
        }
    }


    private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
                                    final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final boolean updateDescendantVersionedGroups,
                                    final Set<String> variablesToSkip, final Map<String, VersionedParameterContext> versionedParameterContexts) throws ProcessorInstantiationException {

        // During the flow update, we will use temporary names for process group ports. This is because port names must be
        // unique within a process group, but during an update we might temporarily be in a state where two ports have the same name.
        // For example, if a process group update involves removing/renaming port A, and then adding/updating port B where B is given
        // A's former name. This is a valid state by the end of the flow update, but for a brief moment there may be two ports with the
        // same name. To avoid this conflict, we keep the final names in a map indexed by port id, use a temporary name for each port
        // during the update, and after all ports have been added/updated/removed, we set the final names on all ports.
        final Map<Port, String> proposedPortFinalNames = new HashMap<>();

        group.setComments(proposed.getComments());

        if (updateName) {
            group.setName(proposed.getName());
        }

        if (updatePosition && proposed.getPosition() != null) {
            group.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
        }

        updateParameterContext(group, proposed, versionedParameterContexts, componentIdSeed);
        updateVariableRegistry(group, proposed, variablesToSkip);

        final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
        if (remoteCoordinates == null) {
            group.disconnectVersionControl(false);
        } else {
            final String registryId = flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
            final String bucketId = remoteCoordinates.getBucketId();
            final String flowId = remoteCoordinates.getFlowId();
            final int version = remoteCoordinates.getVersion();

            final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
            final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();

            final VersionedFlowState flowState = remoteCoordinates.getLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;

            final VersionControlInformation vci = new StandardVersionControlInformation.Builder()
                    .registryId(registryId)
                    .registryName(registryName)
                    .bucketId(bucketId)
                    .bucketName(bucketId)
                    .flowId(flowId)
                    .flowName(flowId)
                    .version(version)
                    .flowSnapshot(proposed)
                    .status(new StandardVersionedFlowStatus(flowState, flowState.getDescription()))
                    .build();

            group.setVersionControlInformation(vci, Collections.emptyMap());
        }


        // Controller Services
        // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller
        // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding
        // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each
        // Controller Service. This way, we ensure that all services have been created before setting the properties. This allows us to
        // properly obtain the correct mapping of Controller Service VersionedComponentID to Controller Service instance id.
        final Map<String, ControllerServiceNode> servicesByVersionedId = group.getControllerServices(false).stream()
                .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
                        NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));

        final Set<String> controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet());

        final Map<ControllerServiceNode, VersionedControllerService> services = new HashMap<>();

        // Add any Controller Service that does not yet exist.
        final Map<String, ControllerServiceNode> servicesAdded = new HashMap<>();
        for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
            ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
            if (service == null) {
                service = addControllerService(group, proposedService, componentIdSeed);
                LOG.info("Added {} to {}", service, group);
                servicesAdded.put(proposedService.getIdentifier(), service);
            }

            services.put(service, proposedService);
        }

        // Because we don't know what order to instantiate the Controller Services, it's possible that we have two services such that Service A references Service B.
        // If Service A happens to get created before Service B, the identifiers won't get matched up. As a result, we now iterate over all created Controller Services
        // and update them again now that all Controller Services have been created at this level, so that the linkage can now be properly established.
        for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
            final ControllerServiceNode addedService = servicesAdded.get(proposedService.getIdentifier());
            if (addedService == null) {
                continue;
            }

            updateControllerService(addedService, proposedService);
        }

        // Update all of the Controller Services to match the VersionedControllerService
        for (final Map.Entry<ControllerServiceNode, VersionedControllerService> entry : services.entrySet()) {
            final ControllerServiceNode service = entry.getKey();
            final VersionedControllerService proposedService = entry.getValue();

            if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
                updateControllerService(service, proposedService);
                LOG.info("Updated {}", service);
            }

            controllerServicesRemoved.remove(proposedService.getIdentifier());
        }

        // Before we can update child groups, we must first remove any connections that are connected to those child groups' input/output ports.
        // We cannot add or update connections yet, though. That must be done at the end, as it's possible that the component that is the source/destination of the connection
        // has not yet been added.
        final Map<String, Connection> connectionsByVersionedId = group.getConnections().stream()
            .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
        final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet());

        for (final VersionedConnection proposedConnection : proposed.getConnections()) {
            connectionsRemoved.remove(proposedConnection.getIdentifier());
        }

        // Connections must be the first thing to remove, not the last. Otherwise, we will fail
        // to remove a component if it has a connection going to it!
        for (final String removedVersionedId : connectionsRemoved) {
            final Connection connection = connectionsByVersionedId.get(removedVersionedId);
            LOG.info("Removing {} from {}", connection, group);
            group.removeConnection(connection);
            flowManager.onConnectionRemoved(connection);
        }


        // Child groups
        final Map<String, ProcessGroup> childGroupsByVersionedId = group.getProcessGroups().stream()
                .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
                        NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
        final Set<String> childGroupsRemoved = new HashSet<>(childGroupsByVersionedId.keySet());

        for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
            final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
            final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates();

            // if there is a nested process group that is versioned controlled, make sure get the param contexts that go with that snapshot
            // instead of the ones from the parent which would have been passed in to this method
            Map<String, VersionedParameterContext> childParameterContexts = versionedParameterContexts;
            if (childCoordinates != null && updateDescendantVersionedGroups) {
                childParameterContexts = getVersionedParameterContexts(childCoordinates);
            }

            if (childGroup == null) {
                final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip, childParameterContexts);
                flowManager.onProcessGroupAdded(added);
                added.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
                LOG.info("Added {} to {}", added, this);
            } else if (childCoordinates == null || updateDescendantVersionedGroups) {
                updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, true, updateDescendantVersionedGroups,
                    variablesToSkip, childParameterContexts);
                LOG.info("Updated {}", childGroup);
            }

            childGroupsRemoved.remove(proposedChildGroup.getIdentifier());
        }

        // Funnels
        final Map<String, Funnel> funnelsByVersionedId = group.getFunnels().stream()
                .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
                        NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
        final Set<String> funnelsRemoved = new HashSet<>(funnelsByVersionedId.keySet());

        for (final VersionedFunnel proposedFunnel : proposed.getFunnels()) {
            final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier());
            if (funnel == null) {
                final Funnel added = addFunnel(group, proposedFunnel, componentIdSeed);
                flowManager.onFunnelAdded(added);
                LOG.info("Added {} to {}", added, this);
            } else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) {
                updateFunnel(funnel, proposedFunnel);
                LOG.info("Updated {}", funnel);
            } else {
                funnel.setPosition(new Position(proposedFunnel.getPosition().getX(), proposedFunnel.getPosition().getY()));
            }

            funnelsRemoved.remove(proposedFunnel.getIdentifier());
        }


        // Input Ports
        final Map<String, Port> inputPortsByVersionedId = group.getInputPorts().stream()
                .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
                        NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
        final Set<String> inputPortsRemoved = new HashSet<>(inputPortsByVersionedId.keySet());

        for (final VersionedPort proposedPort : proposed.getInputPorts()) {
            final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
            if (port == null) {
                final String temporaryName = generateTemporaryPortName(proposedPort);
                final Port added = addInputPort(group, proposedPort, componentIdSeed, temporaryName);
                proposedPortFinalNames.put(added, proposedPort.getName());
                flowManager.onInputPortAdded(added);
                LOG.info("Added {} to {}", added, this);
            } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
                final String temporaryName = generateTemporaryPortName(proposedPort);
                proposedPortFinalNames.put(port, proposedPort.getName());
                updatePort(port, proposedPort, temporaryName);
                LOG.info("Updated {}", port);
            } else {
                port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
            }

            inputPortsRemoved.remove(proposedPort.getIdentifier());
        }

        // Output Ports
        final Map<String, Port> outputPortsByVersionedId = group.getOutputPorts().stream()
                .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
                        NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
        final Set<String> outputPortsRemoved = new HashSet<>(outputPortsByVersionedId.keySet());

        for (final VersionedPort proposedPort : proposed.getOutputPorts()) {
            final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier());
            if (port == null) {
                final String temporaryName = generateTemporaryPortName(proposedPort);
                final Port added = addOutputPort(group, proposedPort, componentIdSeed, temporaryName);
                proposedPortFinalNames.put(added, proposedPort.getName());
                flowManager.onOutputPortAdded(added);
                LOG.info("Added {} to {}", added, this);
            } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
                final String temporaryName = generateTemporaryPortName(proposedPort);
                proposedPortFinalNames.put(port, proposedPort.getName());
                updatePort(port, proposedPort, temporaryName);
                LOG.info("Updated {}", port);
            } else {
                port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
            }

            outputPortsRemoved.remove(proposedPort.getIdentifier());
        }


        // Labels
        final Map<String, Label> labelsByVersionedId = group.getLabels().stream()
                .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
                        NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
        final Set<String> labelsRemoved = new HashSet<>(labelsByVersionedId.keySet());

        for (final VersionedLabel proposedLabel : proposed.getLabels()) {
            final Label label = labelsByVersionedId.get(proposedLabel.getIdentifier());
            if (label == null) {
                final Label added = addLabel(group, proposedLabel, componentIdSeed);
                LOG.info("Added {} to {}", added, this);
            } else if (updatedVersionedComponentIds.contains(proposedLabel.getIdentifier())) {
                updateLabel(label, proposedLabel);
                LOG.info("Updated {}", label);
            } else {
                label.setPosition(new Position(proposedLabel.getPosition().getX(), proposedLabel.getPosition().getY()));
            }

            labelsRemoved.remove(proposedLabel.getIdentifier());
        }


        // Processors
        final Map<String, ProcessorNode> processorsByVersionedId = group.getProcessors().stream()
                .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
                        NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
        final Set<String> processorsRemoved = new HashSet<>(processorsByVersionedId.keySet());
        final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships = new HashMap<>();

        for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
            final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
            if (processor == null) {
                final ProcessorNode added = addProcessor(group, proposedProcessor, componentIdSeed);
                flowManager.onProcessorAdded(added);

                final Set<Relationship> proposedAutoTerminated =
                    proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
                        .map(added::getRelationship)
                        .collect(Collectors.toSet());
                autoTerminatedRelationships.put(added, proposedAutoTerminated);
                LOG.info("Added {} to {}", added, this);
            } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
                updateProcessor(processor, proposedProcessor);

                final Set<Relationship> proposedAutoTerminated =
                    proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
                        .map(processor::getRelationship)
                        .collect(Collectors.toSet());

                if (!processor.getAutoTerminatedRelationships().equals(proposedAutoTerminated)) {
                    autoTerminatedRelationships.put(processor, proposedAutoTerminated);
                }

                LOG.info("Updated {}", processor);
            } else {
                processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
            }

            processorsRemoved.remove(proposedProcessor.getIdentifier());
        }


        // Remote Groups
        final Map<String, RemoteProcessGroup> rpgsByVersionedId = group.getRemoteProcessGroups().stream()
                .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
                        NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
        final Set<String> rpgsRemoved = new HashSet<>(rpgsByVersionedId.keySet());

        for (final VersionedRemoteProcessGroup proposedRpg : proposed.getRemoteProcessGroups()) {
            final RemoteProcessGroup rpg = rpgsByVersionedId.get(proposedRpg.getIdentifier());
            if (rpg == null) {
                final RemoteProcessGroup added = addRemoteProcessGroup(group, proposedRpg, componentIdSeed);
                LOG.info("Added {} to {}", added, this);
            } else if (updatedVersionedComponentIds.contains(proposedRpg.getIdentifier())) {
                updateRemoteProcessGroup(rpg, proposedRpg, componentIdSeed);
                LOG.info("Updated {}", rpg);
            } else {
                rpg.setPosition(new Position(proposedRpg.getPosition().getX(), proposedRpg.getPosition().getY()));
            }

            rpgsRemoved.remove(proposedRpg.getIdentifier());
        }


        // Add and update Connections
        for (final VersionedConnection proposedConnection : proposed.getConnections()) {
            final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
            if (connection == null) {
                final Connection added = addConnection(group, proposedConnection, componentIdSeed);
                flowManager.onConnectionAdded(added);
                LOG.info("Added {} to {}", added, this);
            } else if (isUpdateable(connection)) {
                // If the connection needs to be updated, then the source and destination will already have
                // been stopped (else, the validation above would fail). So if the source or the destination is running,
                // then we know that we don't need to update the connection.
                updateConnection(connection, proposedConnection);
                LOG.info("Updated {}", connection);
            }
        }

        // Remove components that exist in the local flow but not the remote flow.

        // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
        // We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated,
        // then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a
        // Connection for that relationship exists. This will throw an Exception.
        autoTerminatedRelationships.forEach(ProcessorNode::setAutoTerminatedRelationships);

        // Remove all controller services no longer in use
        for (final String removedVersionedId : controllerServicesRemoved) {
            final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId);
            LOG.info("Removing {} from {}", service, group);
            // Must remove Controller Service through Flow Controller in order to remove from cache
            flowController.getControllerServiceProvider().removeControllerService(service);
        }

        for (final String removedVersionedId : funnelsRemoved) {
            final Funnel funnel = funnelsByVersionedId.get(removedVersionedId);
            LOG.info("Removing {} from {}", funnel, group);
            group.removeFunnel(funnel);
        }

        for (final String removedVersionedId : inputPortsRemoved) {
            final Port port = inputPortsByVersionedId.get(removedVersionedId);
            LOG.info("Removing {} from {}", port, group);
            group.removeInputPort(port);
        }

        for (final String removedVersionedId : outputPortsRemoved) {
            final Port port = outputPortsByVersionedId.get(removedVersionedId);
            LOG.info("Removing {} from {}", port, group);
            group.removeOutputPort(port);
        }

        // Now that all input/output ports have been removed, we should be able to update
        // all ports to the final name that was proposed in the new flow version.
        for (final Map.Entry<Port, String> portAndFinalName : proposedPortFinalNames.entrySet()) {
            final Port port = portAndFinalName.getKey();
            final String finalName = portAndFinalName.getValue();
            LOG.info("Updating {} to replace temporary name with final name", port);

            // For public ports we need to consider if another public port exists somewhere else in the flow with the
            // same name, and if so then rename the incoming port so the flow can still be imported
            if (port instanceof PublicPort) {
                final PublicPort publicPort = (PublicPort) port;
                final String publicPortFinalName = getPublicPortFinalName(publicPort, finalName);
                updatePortToSetFinalName(publicPort, publicPortFinalName);
            } else {
                updatePortToSetFinalName(port, finalName);
            }
        }

        for (final String removedVersionedId : labelsRemoved) {
            final Label label = labelsByVersionedId.get(removedVersionedId);
            LOG.info("Removing {} from {}", label, group);
            group.removeLabel(label);
        }

        for (final String removedVersionedId : processorsRemoved) {
            final ProcessorNode processor = processorsByVersionedId.get(removedVersionedId);
            LOG.info("Removing {} from {}", processor, group);
            group.removeProcessor(processor);
        }

        for (final String removedVersionedId : rpgsRemoved) {
            final RemoteProcessGroup rpg = rpgsByVersionedId.get(removedVersionedId);
            LOG.info("Removing {} from {}", rpg, group);
            group.removeRemoteProcessGroup(rpg);
        }

        for (final String removedVersionedId : childGroupsRemoved) {
            final ProcessGroup childGroup = childGroupsByVersionedId.get(removedVersionedId);
            LOG.info("Removing {} from {}", childGroup, group);
            group.removeProcessGroup(childGroup);
        }
    }

    private Map<String,VersionedParameterContext> getVersionedParameterContexts(final VersionedFlowCoordinates versionedFlowCoordinates) {
        final FlowRegistryClient flowRegistryClient = flowController.getFlowRegistryClient();

        final String registryId = flowRegistryClient.getFlowRegistryId(versionedFlowCoordinates.getRegistryUrl());
        if (registryId == null) {
            throw new ResourceNotFoundException("Could not find any Flow Registry registered with url: " + versionedFlowCoordinates.getRegistryUrl());
        }

        final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
        if (flowRegistry == null) {
            throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + registryId);
        }

        final String bucketId = versionedFlowCoordinates.getBucketId();
        final String flowId = versionedFlowCoordinates.getFlowId();
        final int flowVersion = versionedFlowCoordinates.getVersion();

        try {
            final VersionedFlowSnapshot childSnapshot = flowRegistry.getFlowContents(bucketId, flowId, flowVersion, false);
            return  childSnapshot.getParameterContexts();
        } catch (final NiFiRegistryException e) {
            throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
                    + bucketId + ", Flow " + flowId + ", Version " + flowVersion);
        } catch (final IOException ioe) {
            throw new IllegalStateException(
                    "Failed to communicate with Flow Registry when attempting to retrieve a versioned flow");
        }
    }

    private ParameterContext createParameterContext(final VersionedParameterContext versionedParameterContext, final String parameterContextId) {
        final Map<String, Parameter> parameters = new HashMap<>();
        for (final VersionedParameter versionedParameter : versionedParameterContext.getParameters()) {
            final ParameterDescriptor descriptor = new ParameterDescriptor.Builder()
                .name(versionedParameter.getName())
                .description(versionedParameter.getDescription())
                .sensitive(versionedParameter.isSensitive())
                .build();

            final Parameter parameter = new Parameter(descriptor, versionedParameter.getValue());
            parameters.put(versionedParameter.getName(), parameter);
        }

        return flowController.getFlowManager().createParameterContext(parameterContextId, versionedParameterContext.getName(), parameters);
    }

    private void addMissingParameters(final VersionedParameterContext versionedParameterContext, final ParameterContext currentParameterContext) {
        final Map<String, Parameter> parameters = new HashMap<>();
        for (final VersionedParameter versionedParameter : versionedParameterContext.getParameters()) {
            final Optional<Parameter> parameterOption = currentParameterContext.getParameter(versionedParameter.getName());
            if (parameterOption.isPresent()) {
                // Skip this parameter, since it is already defined. We only want to add missing parameters
                continue;
            }

            final ParameterDescriptor descriptor = new ParameterDescriptor.Builder()
                .name(versionedParameter.getName())
                .description(versionedParameter.getDescription())
                .sensitive(versionedParameter.isSensitive())
                .build();

            final Parameter parameter = new Parameter(descriptor, versionedParameter.getValue());
            parameters.put(versionedParameter.getName(), parameter);
        }

        currentParameterContext.setParameters(parameters);
    }

    private ParameterContext getParameterContextByName(final String contextName) {
        return flowController.getFlowManager().getParameterContextManager().getParameterContexts().stream()
            .filter(context -> context.getName().equals(contextName))
            .findAny()
            .orElse(null);
    }

    private void updateParameterContext(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
                                        final String componentIdSeed) {
        // Update the Parameter Context
        final ParameterContext currentParamContext = group.getParameterContext();
        final String proposedParameterContextName = proposed.getParameterContextName();
        if (proposedParameterContextName != null) {
            if (currentParamContext == null) {
                // Create a new Parameter Context based on the parameters provided
                final VersionedParameterContext versionedParameterContext = versionedParameterContexts.get(proposedParameterContextName);

                final ParameterContext contextByName = getParameterContextByName(versionedParameterContext.getName());
                final ParameterContext selectedParameterContext;
                if (contextByName == null) {
                    final String parameterContextId = generateUuid(versionedParameterContext.getName(), versionedParameterContext.getName(), componentIdSeed);
                    selectedParameterContext = createParameterContext(versionedParameterContext, parameterContextId);
                } else {
                    selectedParameterContext = contextByName;
                    addMissingParameters(versionedParameterContext, selectedParameterContext);
                }

                group.setParameterContext(selectedParameterContext);
            } else {
                // Update the current Parameter Context so that it has any Parameters included in the proposed context
                final VersionedParameterContext versionedParameterContext = versionedParameterContexts.get(proposedParameterContextName);
                addMissingParameters(versionedParameterContext, currentParamContext);
            }
        }
    }

    private void updateVariableRegistry(final ProcessGroup group, final VersionedProcessGroup proposed, final Set<String> variablesToSkip) {
        // Determine which variables have been added/removed and add/remove them from this group's variable registry.
        // We don't worry about if a variable value has changed, because variables are designed to be 'environment specific.'
        // As a result, once imported, we won't update variables to match the remote flow, but we will add any missing variables
        // and remove any variables that are no longer part of the remote flow.
        final Set<String> existingVariableNames = group.getVariableRegistry().getVariableMap().keySet().stream()
            .map(VariableDescriptor::getName)
            .collect(Collectors.toSet());

        final Map<String, String> updatedVariableMap = new HashMap<>();

        // If any new variables exist in the proposed flow, add those to the variable registry.
        for (final Map.Entry<String, String> entry : proposed.getVariables().entrySet()) {
            if (!existingVariableNames.contains(entry.getKey()) && !variablesToSkip.contains(entry.getKey())) {
                updatedVariableMap.put(entry.getKey(), entry.getValue());
            }
        }

        group.setVariables(updatedVariableMap);
    }


    private String getPublicPortFinalName(final PublicPort publicPort, final String proposedFinalName) {
        final Optional<Port> existingPublicPort;
        if (TransferDirection.RECEIVE == publicPort.getDirection()) {
            existingPublicPort = flowManager.getPublicInputPort(proposedFinalName);
        } else {
            existingPublicPort = flowManager.getPublicOutputPort(proposedFinalName);
        }

        if (existingPublicPort.isPresent() && !existingPublicPort.get().getIdentifier().equals(publicPort.getIdentifier())) {
            return getPublicPortFinalName(publicPort, "Copy of " + proposedFinalName);
        } else {
            return proposedFinalName;
        }
    }

    private boolean isUpdateable(final Connection connection) {
        final Connectable source = connection.getSource();
        if (source.getConnectableType() != ConnectableType.FUNNEL && source.isRunning()) {
            return false;
        }

        final Connectable destination = connection.getDestination();
        return destination.getConnectableType() == ConnectableType.FUNNEL || !destination.isRunning();
    }

    private String generateTemporaryPortName(final VersionedPort proposedPort) {
        final String versionedPortId = proposedPort.getIdentifier();
        final String proposedPortFinalName = proposedPort.getName();
        return proposedPortFinalName + " (" + versionedPortId + ")";
    }

    private void updatePortToSetFinalName(final Port port, final String name) {
        writeLock.lock();
        try {
            port.setName(name);
        } finally {
            writeLock.unlock();
        }
    }

    private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) {
        long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits();

        UUID uuid;
        if (StringUtils.isBlank(seed)) {
            long lsb = randomGenerator.nextLong();
            // since msb is extracted from type-one UUID, the type-one semantics will be preserved
            uuid = new UUID(msb, lsb);
        } else {
            UUID seedId = UUID.nameUUIDFromBytes((propposedId + destinationGroupId + seed).getBytes(StandardCharsets.UTF_8));
            uuid = new UUID(msb, seedId.getLeastSignificantBits());
        }
        LOG.debug("Generating UUID {} from currentId={}, seed={}", uuid, propposedId, seed);
        return uuid.toString();
    }


    private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed, final Set<String> variablesToSkip,
                                         final Map<String, VersionedParameterContext> versionedParameterContexts)
            throws ProcessorInstantiationException {
        final ProcessGroup group = flowManager.createProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
        group.setVersionedComponentId(proposed.getIdentifier());
        group.setParent(destination);
        updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, true, variablesToSkip, versionedParameterContexts);
        destination.addProcessGroup(group);
        return group;
    }

    private void updateConnection(final Connection connection, final VersionedConnection proposed) {
        connection.setBendPoints(proposed.getBends() == null ? Collections.emptyList() :
                proposed.getBends().stream()
                        .map(pos -> new Position(pos.getX(), pos.getY()))
                        .collect(Collectors.toList()));

        connection.setDestination(getConnectable(connection.getProcessGroup(), proposed.getDestination()));
        connection.setLabelIndex(proposed.getLabelIndex());
        connection.setName(proposed.getName());
        connection.setRelationships(proposed.getSelectedRelationships().stream()
                .map(name -> new Relationship.Builder().name(name).build())
                .collect(Collectors.toSet()));
        connection.setZIndex(proposed.getzIndex());

        final FlowFileQueue queue = connection.getFlowFileQueue();
        queue.setBackPressureDataSizeThreshold(proposed.getBackPressureDataSizeThreshold());
        queue.setBackPressureObjectThreshold(proposed.getBackPressureObjectThreshold());
        queue.setFlowFileExpiration(proposed.getFlowFileExpiration());

        final List<FlowFilePrioritizer> prioritizers = proposed.getPrioritizers() == null ? Collections.emptyList() : proposed.getPrioritizers().stream()
                .map(prioritizerName -> {
                    try {
                        return flowManager.createPrioritizer(prioritizerName);
                    } catch (final Exception e) {
                        throw new IllegalStateException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier());
                    }
                })
                .collect(Collectors.toList());

        queue.setPriorities(prioritizers);

        final String loadBalanceStrategyName = proposed.getLoadBalanceStrategy();
        if (loadBalanceStrategyName == null) {
            queue.setLoadBalanceStrategy(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, proposed.getPartitioningAttribute());
        } else {
            final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
            final String partitioningAttribute = proposed.getPartitioningAttribute();

            queue.setLoadBalanceStrategy(loadBalanceStrategy, partitioningAttribute);
        }

        final String compressionName = proposed.getLoadBalanceCompression();
        if (compressionName == null) {
            queue.setLoadBalanceCompression(LoadBalanceCompression.DO_NOT_COMPRESS);
        } else {
            queue.setLoadBalanceCompression(LoadBalanceCompression.valueOf(compressionName));
        }
    }

    private Connection addConnection(final ProcessGroup destinationGroup, final VersionedConnection proposed, final String componentIdSeed) {
        final Connectable source = getConnectable(destinationGroup, proposed.getSource());
        if (source == null) {
            throw new IllegalArgumentException("Connection has a source with identifier " + proposed.getIdentifier()
                    + " but no component could be found in the Process Group with a corresponding identifier");
        }

        final Connectable destination = getConnectable(destinationGroup, proposed.getDestination());
        if (destination == null) {
            throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getDestination().getId()
                    + " but no component could be found in the Process Group with a corresponding identifier");
        }

        final Connection connection = flowController.createConnection(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName(), source, destination,
                proposed.getSelectedRelationships());
        connection.setVersionedComponentId(proposed.getIdentifier());
        destinationGroup.addConnection(connection);
        updateConnection(connection, proposed);

        flowManager.onConnectionAdded(connection);
        return connection;
    }

    private Connectable getConnectable(final ProcessGroup group, final ConnectableComponent connectableComponent) {
        final String id = connectableComponent.getId();

        switch (connectableComponent.getType()) {
            case FUNNEL:
                return group.getFunnels().stream()
                        .filter(component -> id.equals(component.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                        .findAny()
                        .orElse(null);
            case INPUT_PORT: {
                final Optional<Port> port = group.getInputPorts().stream()
                        .filter(component -> id.equals(component.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                        .findAny();

                if (port.isPresent()) {
                    return port.get();
                }

                // Attempt to locate child group by versioned component id
                final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
                        .filter(child -> child.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(child.getIdentifier())).equals(connectableComponent.getGroupId()))
                        .findFirst();

                if (optionalSpecifiedGroup.isPresent()) {
                    final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
                    return specifiedGroup.getInputPorts().stream()
                            .filter(component -> id.equals(component.getVersionedComponentId().orElse(
                                    NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                            .findAny()
                            .orElse(null);
                }

                // If no child group matched the versioned component id, then look at all child groups. This is done because
                // in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result,
                // if the flow doesn't contain the properly mapped group id, we need to search all child groups.
                return group.getProcessGroups().stream()
                        .flatMap(gr -> gr.getInputPorts().stream())
                        .filter(component -> id.equals(component.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                        .findAny()
                        .orElse(null);
            }
            case OUTPUT_PORT: {
                final Optional<Port> port = group.getOutputPorts().stream()
                        .filter(component -> id.equals(component.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                        .findAny();

                if (port.isPresent()) {
                    return port.get();
                }

                // Attempt to locate child group by versioned component id
                final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
                        .filter(child -> child.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(child.getIdentifier())).equals(connectableComponent.getGroupId()))
                        .findFirst();

                if (optionalSpecifiedGroup.isPresent()) {
                    final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
                    return specifiedGroup.getOutputPorts().stream()
                            .filter(component -> id.equals(component.getVersionedComponentId().orElse(
                                    NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                            .findAny()
                            .orElse(null);
                }

                // If no child group matched the versioned component id, then look at all child groups. This is done because
                // in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result,
                // if the flow doesn't contain the properly mapped group id, we need to search all child groups.
                return group.getProcessGroups().stream()
                        .flatMap(gr -> gr.getOutputPorts().stream())
                        .filter(component -> id.equals(component.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                        .findAny()
                        .orElse(null);
            }
            case PROCESSOR:
                return group.getProcessors().stream()
                        .filter(component -> id.equals(component.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                        .findAny()
                        .orElse(null);
            case REMOTE_INPUT_PORT: {
                final String rpgId = connectableComponent.getGroupId();
                final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream()
                        .filter(component -> rpgId.equals(component.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                        .findAny();

                if (!rpgOption.isPresent()) {
                    throw new IllegalArgumentException("Connection refers to a Port with ID " + id + " within Remote Process Group with ID "
                            + rpgId + " but could not find a Remote Process Group corresponding to that ID");
                }

                final RemoteProcessGroup rpg = rpgOption.get();
                final Optional<RemoteGroupPort> portByIdOption = rpg.getInputPorts().stream()
                        .filter(component -> id.equals(component.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                        .findAny();

                if (portByIdOption.isPresent()) {
                    return portByIdOption.get();
                }

                return rpg.getInputPorts().stream()
                        .filter(component -> connectableComponent.getName().equals(component.getName()))
                        .findAny()
                        .orElse(null);
            }
            case REMOTE_OUTPUT_PORT: {
                final String rpgId = connectableComponent.getGroupId();
                final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream()
                        .filter(component -> rpgId.equals(component.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                        .findAny();

                if (!rpgOption.isPresent()) {
                    throw new IllegalArgumentException("Connection refers to a Port with ID " + id + " within Remote Process Group with ID "
                            + rpgId + " but could not find a Remote Process Group corresponding to that ID");
                }

                final RemoteProcessGroup rpg = rpgOption.get();
                final Optional<RemoteGroupPort> portByIdOption = rpg.getOutputPorts().stream()
                        .filter(component -> id.equals(component.getVersionedComponentId().orElse(
                                NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
                        .findAny();

                if (portByIdOption.isPresent()) {
                    return portByIdOption.get();
                }

                return rpg.getOutputPorts().stream()
                        .filter(component -> connectableComponent.getName().equals(component.getName()))
                        .findAny()
                        .orElse(null);
            }
        }

        return null;
    }

    private void updateControllerService(final ControllerServiceNode service, final VersionedControllerService proposed) {
        service.pauseValidationTrigger();
        try {
            service.setAnnotationData(proposed.getAnnotationData());
            service.setComments(proposed.getComments());
            service.setName(proposed.getName());

            final Map<String, String> properties = populatePropertiesMap(service, proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup());
            service.setProperties(properties, true);

            if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
                final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
                final List<PropertyDescriptor> descriptors = new ArrayList<>(service.getRawPropertyValues().keySet());
                final Set<URL> additionalUrls = service.getAdditionalClasspathResources(descriptors);
                flowController.getReloadComponent().reload(service, proposed.getType(), newBundleCoordinate, additionalUrls);
            }
        } finally {
            service.resumeValidationTrigger();
        }
    }

    private boolean isEqual(final BundleCoordinate coordinate, final Bundle bundle) {
        if (!bundle.getGroup().equals(coordinate.getGroup())) {
            return false;
        }

        if (!bundle.getArtifact().equals(coordinate.getId())) {
            return false;
        }

        return bundle.getVersion().equals(coordinate.getVersion());
    }

    private BundleCoordinate toCoordinate(final Bundle bundle) {
        return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
    }

    private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final String componentIdSeed) {
        final String type = proposed.getType();
        final String id = generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed);

        final Bundle bundle = proposed.getBundle();
        final BundleCoordinate coordinate = toCoordinate(bundle);
        final boolean firstTimeAdded = true;
        final Set<URL> additionalUrls = Collections.emptySet();

        final ControllerServiceNode newService = flowManager.createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded, true);
        newService.setVersionedComponentId(proposed.getIdentifier());

        destination.addControllerService(newService);
        updateControllerService(newService, proposed);

        return newService;
    }

    private void updateFunnel(final Funnel funnel, final VersionedFunnel proposed) {
        funnel.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
    }

    private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final String componentIdSeed) {
        final Funnel funnel = flowManager.createFunnel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
        funnel.setVersionedComponentId(proposed.getIdentifier());
        destination.addFunnel(funnel);
        updateFunnel(funnel, proposed);

        return funnel;
    }

    private void updatePort(final Port port, final VersionedPort proposed, final String temporaryName) {
        final String name = temporaryName != null ? temporaryName : proposed.getName();
        port.setComments(proposed.getComments());
        port.setName(name);
        port.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
        port.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
    }

    private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed, final String temporaryName) {
        final String name = temporaryName != null ? temporaryName : proposed.getName();

        final Port port;
        if (proposed.isAllowRemoteAccess()) {
            port = flowManager.createPublicInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), name);
        } else {
            port = flowManager.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), name);
        }

        port.setVersionedComponentId(proposed.getIdentifier());
        destination.addInputPort(port);
        updatePort(port, proposed, temporaryName);

        return port;
    }

    private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed, final String temporaryName) {
        final String name = temporaryName != null ? temporaryName : proposed.getName();

        final Port port;
        if (proposed.isAllowRemoteAccess()) {
            port = flowManager.createPublicOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), name);
        } else {
            port = flowManager.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), name);
        }

        port.setVersionedComponentId(proposed.getIdentifier());
        destination.addOutputPort(port);
        updatePort(port, proposed, temporaryName);

        return port;
    }

    private Label addLabel(final ProcessGroup destination, final VersionedLabel proposed, final String componentIdSeed) {
        final Label label = flowManager.createLabel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getLabel());
        label.setVersionedComponentId(proposed.getIdentifier());
        destination.addLabel(label);
        updateLabel(label, proposed);

        return label;
    }

    private void updateLabel(final Label label, final VersionedLabel proposed) {
        label.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
        label.setSize(new Size(proposed.getWidth(), proposed.getHeight()));
        label.setStyle(proposed.getStyle());
        label.setValue(proposed.getLabel());
    }

    private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final String componentIdSeed) throws ProcessorInstantiationException {
        final BundleCoordinate coordinate = toCoordinate(proposed.getBundle());
        final ProcessorNode procNode = flowManager.createProcessor(proposed.getType(), generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), coordinate, true);
        procNode.setVersionedComponentId(proposed.getIdentifier());

        destination.addProcessor(procNode);
        updateProcessor(procNode, proposed);

        return procNode;
    }

    private void updateProcessor(final ProcessorNode processor, final VersionedProcessor proposed) throws ProcessorInstantiationException {
        processor.pauseValidationTrigger();
        try {
            processor.setAnnotationData(proposed.getAnnotationData());
            processor.setBulletinLevel(LogLevel.valueOf(proposed.getBulletinLevel()));
            processor.setComments(proposed.getComments());
            processor.setName(proposed.getName());
            processor.setPenalizationPeriod(proposed.getPenaltyDuration());

            final Map<String, String> properties = populatePropertiesMap(processor, proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup());
            processor.setProperties(properties, true);
            processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS);
            processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
            processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
            processor.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
            processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
            processor.setStyle(proposed.getStyle());
            processor.setYieldPeriod(proposed.getYieldDuration());
            processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));

            if (proposed.getScheduledState() == org.apache.nifi.registry.flow.ScheduledState.DISABLED) {
                processor.getProcessGroup().disableProcessor(processor);
            } else if (processor.getScheduledState() == ScheduledState.DISABLED) {
                processor.getProcessGroup().enableProcessor(processor);
            }

            if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
                final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
                final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
                final Set<URL> additionalUrls = processor.getAdditionalClasspathResources(descriptors);
                flowController.getReloadComponent().reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls);
            }
        } finally {
            processor.resumeValidationTrigger();
        }
    }


    private Map<String, String> populatePropertiesMap(final ComponentNode componentNode, final Map<String, String> proposedProperties,
                                                      final Map<String, VersionedPropertyDescriptor> proposedDescriptors, final ProcessGroup group) {

        // since VersionedPropertyDescriptor currently doesn't know if it is sensitive or not,
        // keep track of which property descriptors are sensitive from the current properties
        final Set<String> sensitiveProperties = new HashSet<>();

        final Map<String, String> fullPropertyMap = new HashMap<>();
        for (final PropertyDescriptor property : componentNode.getRawPropertyValues().keySet()) {
            if (property.isSensitive()) {
                sensitiveProperties.add(property.getName());
            } else {
                fullPropertyMap.put(property.getName(), null);
            }
        }

        if (proposedProperties != null) {
            // Build a Set of all properties that are included in either the currently configured property values or the proposed values.
            final Set<String> updatedPropertyNames = new HashSet<>();
            updatedPropertyNames.addAll(proposedProperties.keySet());
            componentNode.getProperties().keySet().stream()
                .map(PropertyDescriptor::getName)
                .forEach(updatedPropertyNames::add);

            for (final String propertyName : updatedPropertyNames) {
                final VersionedPropertyDescriptor descriptor = proposedDescriptors.get(propertyName);

                String value;
                if (descriptor != null && descriptor.getIdentifiesControllerService()) {

                    // Need to determine if the component's property descriptor for this service is already set to an id
                    // of an existing service that is outside the current processor group, and if it is we want to leave
                    // the property set to that value
                    String existingExternalServiceId = null;
                    final PropertyDescriptor componentDescriptor = componentNode.getPropertyDescriptor(propertyName);
                    if (componentDescriptor != null) {
                        final String componentDescriptorValue = componentNode.getEffectivePropertyValue(componentDescriptor);
                        if (componentDescriptorValue != null) {
                            final ControllerServiceNode serviceNode = findAncestorControllerService(componentDescriptorValue, getParent());
                            if (serviceNode != null) {
                                existingExternalServiceId = componentDescriptorValue;
                            }
                        }
                    }

                    // If the component's property descriptor is not already set to an id of an existing external service,
                    // then we need to take the Versioned Component ID and resolve this to the instance ID of the service
                    if (existingExternalServiceId == null) {
                        final String serviceVersionedComponentId = proposedProperties.get(propertyName);
                        String instanceId = getServiceInstanceId(serviceVersionedComponentId, group);
                        value = instanceId == null ? serviceVersionedComponentId : instanceId;
                    } else {
                        value = existingExternalServiceId;
                    }

                } else {
                    value = proposedProperties.get(propertyName);
                }

                // skip any sensitive properties that are not populated so we can retain whatever is currently set. We do this because sensitive properties are not stored in the registry
                // unless the value is a reference to a Parameter. If the value in the registry is null, it indicates that the sensitive value was removed, so we want to keep the currently
                // populated value. The exception to this rule is if the currently configured value is a Parameter Reference and the Versioned Flow is empty. In this case, it implies
                // that the Versioned Flow has changed from a Parameter Reference to an explicit value. In this case, we do in fact want to change the value of the Sensitive Property from
                // the current parameter reference to an unset value.
                if (sensitiveProperties.contains(propertyName) && value == null) {
                    final PropertyConfiguration propertyConfiguration = componentNode.getProperty(componentNode.getPropertyDescriptor(propertyName));
                    if (propertyConfiguration == null) {
                        continue;
                    }

                    // No parameter references. Property currently is set to an explicit value. We don't want to change it.
                    if (propertyConfiguration.getParameterReferences().isEmpty()) {
                        continue;
                    }

                    // Once we reach this point, the property is configured to reference a Parameter, and the value in the Versioned Flow is an explicit value,
                    // so we want to continue on and update the value to null.
                }

                fullPropertyMap.put(propertyName, value);
            }
        }

        return fullPropertyMap;
    }

    private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) {
        for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
            final String versionedId = serviceNode.getVersionedComponentId().orElse(
                    NiFiRegistryFlowMapper.generateVersionedComponentId(serviceNode.getIdentifier()));
            if (versionedId.equals(serviceVersionedComponentId)) {
                return serviceNode.getIdentifier();
            }
        }

        final ProcessGroup parent = group.getParent();
        if (parent == null) {
            return null;
        }

        return getServiceInstanceId(serviceVersionedComponentId, parent);

    }

    private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
        final RemoteProcessGroup rpg = flowManager.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris());
        rpg.setVersionedComponentId(proposed.getIdentifier());

        destination.addRemoteProcessGroup(rpg);
        updateRemoteProcessGroup(rpg, proposed, componentIdSeed);

        return rpg;
    }

    private void updateRemoteProcessGroup(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
        rpg.setComments(proposed.getComments());
        rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout());
        rpg.setInputPorts(proposed.getInputPorts() == null ? Collections.emptySet() : proposed.getInputPorts().stream()
                .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
                .collect(Collectors.toSet()), false);
        rpg.setName(proposed.getName());
        rpg.setNetworkInterface(proposed.getLocalNetworkInterface());
        rpg.setOutputPorts(proposed.getOutputPorts() == null ? Collections.emptySet() : proposed.getOutputPorts().stream()
                .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
                .collect(Collectors.toSet()), false);
        rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
        rpg.setProxyHost(proposed.getProxyHost());
        rpg.setProxyPort(proposed.getProxyPort());
        rpg.setProxyUser(proposed.getProxyUser());
        rpg.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(proposed.getTransportProtocol()));
        rpg.setYieldDuration(proposed.getYieldDuration());
    }

    private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed, final String componentIdSeed, final String rpgId) {
        final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
        descriptor.setVersionedComponentId(proposed.getIdentifier());

        final BatchSize batchSize = proposed.getBatchSize();
        if (batchSize != null) {
            descriptor.setBatchCount(batchSize.getCount());
            descriptor.setBatchDuration(batchSize.getDuration());
            descriptor.setBatchSize(batchSize.getSize());
        }

        descriptor.setComments(proposed.getComments());
        descriptor.setConcurrentlySchedulableTaskCount(proposed.getConcurrentlySchedulableTaskCount());
        descriptor.setGroupId(proposed.getRemoteGroupId());
        descriptor.setTargetId(proposed.getTargetId());
        descriptor.setId(generateUuid(proposed.getIdentifier(), rpgId, componentIdSeed));
        descriptor.setName(proposed.getName());
        descriptor.setUseCompression(proposed.isUseCompression());
        return descriptor;
    }


    private Set<FlowDifference> getModifications() {
        final StandardVersionControlInformation vci = versionControlInfo.get();

        // If this group is not under version control, then we need to notify the parent
        // group (if any) that a modification has taken place. Otherwise, we need to
        // compare the current the flow with the 'versioned snapshot' of the flow in order
        // to determine if the flows are different.
        // We cannot simply say 'if something changed then this flow is different than the versioned snapshot'
        // because if we do this, and a user adds a processor then subsequently removes it, then the logic would
        // say that the flow is modified. There would be no way to ever go back to the flow not being modified.
        // So we have to perform a diff of the flows and see if they are the same.
        if (vci == null) {
            return null;
        }

        if (vci.getFlowSnapshot() == null) {
            // we haven't retrieved the flow from the Flow Registry yet, so we don't know if it's been modified.
            // As a result, we will just return an empty optional
            return null;
        }

        try {
            final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
            final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false);

            final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
            final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());

            final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(), new EvolvingDifferenceDescriptor());
            final FlowComparison comparison = flowComparator.compare();
            final Set<FlowDifference> differences = comparison.getDifferences().stream()
                .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
                .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
                .filter(FlowDifferenceFilters.FILTER_PUBLIC_PORT_NAME_CHANGES)
                .filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
                .filter(diff -> !FlowDifferenceFilters.isNewPropertyWithDefaultValue(diff, flowManager))
                .filter(diff -> !FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(diff, versionedGroup, flowManager))
                .filter(diff -> !FlowDifferenceFilters.isScheduledStateNew(diff))
                .collect(Collectors.toCollection(HashSet::new));

            LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
            return differences;
        } catch (final RuntimeException e) {
            throw new RuntimeException("Could not compute differences between local flow and Versioned Flow in NiFi Registry for " + this, e);
        }
    }


    @Override
    public void verifyCanUpdate(final VersionedFlowSnapshot updatedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
        readLock.lock();
        try {
            // flow id match and not dirty check concepts are only applicable to versioned flows
            final VersionControlInformation versionControlInfo = getVersionControlInformation();
            if (versionControlInfo != null) {
                if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
                    throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
                }

                if (verifyNotDirty) {
                    final VersionedFlowState flowState = versionControlInfo.getStatus().getState();
                    final boolean modified = flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;

                    final Set<FlowDifference> modifications = getModifications();

                    if (modified) {
                        final String changes = modifications.stream()
                                .map(FlowDifference::toString)
                                .collect(Collectors.joining("\n"));

                        LOG.error("Cannot change the Version of the flow for {} because the Process Group has been modified ({} modifications) "
                                        + "since it was last synchronized with the Flow Registry. The following differences were found:\n{}",
                                this, modifications.size(), changes);

                        throw new IllegalStateException("Cannot change the Version of the flow for " + this
                                + " because the Process Group has been modified (" + modifications.size()
                                + " modifications) since it was last synchronized with the Flow Registry. The Process Group must be"
                                + " reverted to its original form before changing the version.");
                    }
                }

                verifyNoDescendantsWithLocalModifications("be updated");
            }

            final VersionedProcessGroup flowContents = updatedFlow.getFlowContents();

            // Ensure no deleted child process groups contain templates and optionally no deleted connections contain data
            // in their queue. Note that this check enforces ancestry among the group components to avoid a scenario where
            // a component is matched by id, but it does not exist in the same hierarchy and thus will be removed and
            // re-added when the update is performed
            verifyCanRemoveMissingComponents(this, flowContents, verifyConnectionRemoval);

            // Determine which input ports were removed from this process group
            final Map<String, Port> removedInputPortsByVersionId = new HashMap<>();
            getInputPorts().stream()
                    .forEach(port -> removedInputPortsByVersionId.put(port.getVersionedComponentId().orElse(
                            NiFiRegistryFlowMapper.generateVersionedComponentId(port.getIdentifier())), port));
            flowContents.getInputPorts().stream()
                    .map(VersionedPort::getIdentifier)
                    .forEach(removedInputPortsByVersionId::remove);

            // Ensure that there are no incoming connections for any Input Port that was removed.
            for (final Port inputPort : removedInputPortsByVersionId.values()) {
                final List<Connection> incomingConnections = inputPort.getIncomingConnections();
                if (!incomingConnections.isEmpty()) {
                    throw new IllegalStateException(this + " cannot be updated to the proposed flow because the proposed flow "
                            + "does not contain the Input Port " + inputPort + " and the Input Port currently has an incoming connection");
                }
            }

            // Determine which output ports were removed from this process group
            final Map<String, Port> removedOutputPortsByVersionId = new HashMap<>();
            getOutputPorts().stream()
                    .forEach(port -> removedOutputPortsByVersionId.put(port.getVersionedComponentId().orElse(
                            NiFiRegistryFlowMapper.generateVersionedComponentId(port.getIdentifier())), port));
            flowContents.getOutputPorts().stream()
                    .map(VersionedPort::getIdentifier)
                    .forEach(removedOutputPortsByVersionId::remove);

            // Ensure that there are no outgoing connections for any Output Port that was removed.
            for (final Port outputPort : removedOutputPortsByVersionId.values()) {
                final Set<Connection> outgoingConnections = outputPort.getConnections();
                if (!outgoingConnections.isEmpty()) {
                    throw new IllegalStateException(this + " cannot be updated to the proposed flow because the proposed flow "
                            + "does not contain the Output Port " + outputPort + " and the Output Port currently has an outgoing connection");
                }
            }

            // Ensure that all Processors are instantiable
            final Map<String, VersionedProcessor> proposedProcessors = new HashMap<>();
            findAllProcessors(flowContents, proposedProcessors);

            findAllProcessors().stream()
                    .forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().orElse(
                            NiFiRegistryFlowMapper.generateVersionedComponentId(proc.getIdentifier()))));

            for (final VersionedProcessor processorToAdd : proposedProcessors.values()) {
                final String processorToAddClass = processorToAdd.getType();
                final BundleCoordinate processorToAddCoordinate = toCoordinate(processorToAdd.getBundle());

                final boolean bundleExists = flowController.getExtensionManager().getBundles(processorToAddClass).stream()
                        .anyMatch(b -> processorToAddCoordinate.equals(b.getBundleDetails().getCoordinate()));

                if (!bundleExists) {
                    throw new IllegalArgumentException("Unknown bundle " + processorToAddCoordinate.toString() + " for processor type " + processorToAddClass);
                }
            }

            // Ensure that all Controller Services are instantiable
            final Map<String, VersionedControllerService> proposedServices = new HashMap<>();
            findAllControllerServices(flowContents, proposedServices);

            findAllControllerServices().stream()
                    .forEach(service -> proposedServices.remove(service.getVersionedComponentId().orElse(
                            NiFiRegistryFlowMapper.generateVersionedComponentId(service.getIdentifier()))));

            for (final VersionedControllerService serviceToAdd : proposedServices.values()) {
                final String serviceToAddClass = serviceToAdd.getType();
                final BundleCoordinate serviceToAddCoordinate = toCoordinate(serviceToAdd.getBundle());

                final boolean bundleExists = flowController.getExtensionManager().getBundles(serviceToAddClass).stream()
                        .anyMatch(b -> serviceToAddCoordinate.equals(b.getBundleDetails().getCoordinate()));

                if (!bundleExists) {
                    throw new IllegalArgumentException("Unknown bundle " + serviceToAddCoordinate.toString() + " for service type " + serviceToAddClass);
                }
            }

            // Ensure that all Prioritizers are instantiate-able and that any load balancing configuration is correct
            // Enforcing ancestry on connection matching here is not important because all we're interested in is locating
            // new prioritizers and load balance strategy types so if a matching connection existed anywhere in the current
            // flow, then its prioritizer and load balance strategy are already validated
            final Map<String, VersionedConnection> proposedConnections = new HashMap<>();
            findAllConnections(flowContents, proposedConnections);

            findAllConnections().stream()
                    .forEach(conn -> proposedConnections.remove(conn.getVersionedComponentId().orElse(
                            NiFiRegistryFlowMapper.generateVersionedComponentId(conn.getIdentifier()))));

            for (final VersionedConnection connectionToAdd : proposedConnections.values()) {
                if (connectionToAdd.getPrioritizers() != null) {
                    for (final String prioritizerType : connectionToAdd.getPrioritizers()) {
                        try {
                            flowManager.createPrioritizer(prioritizerType);
                        } catch (Exception e) {
                            throw new IllegalArgumentException("Unable to create Prioritizer of type " + prioritizerType, e);
                        }
                    }
                }

                final String loadBalanceStrategyName = connectionToAdd.getLoadBalanceStrategy();
                if (loadBalanceStrategyName != null) {
                    try {
                        LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
                    } catch (final IllegalArgumentException iae) {
                        throw new IllegalArgumentException("Unable to create Connection with Load Balance Strategy of '" + loadBalanceStrategyName
                                + "' because this is not a known Load Balance Strategy");
                    }
                }
            }
        } finally {
            readLock.unlock();
        }
    }

    private void findAllProcessors(final VersionedProcessGroup group, final Map<String, VersionedProcessor> map) {
        for (final VersionedProcessor processor : group.getProcessors()) {
            map.put(processor.getIdentifier(), processor);
        }

        for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
            findAllProcessors(childGroup, map);
        }
    }

    private void findAllControllerServices(final VersionedProcessGroup group, final Map<String, VersionedControllerService> map) {
        for (final VersionedControllerService service : group.getControllerServices()) {
            map.put(service.getIdentifier(), service);
        }

        for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
            findAllControllerServices(childGroup, map);
        }
    }

    private void findAllConnections(final VersionedProcessGroup group, final Map<String, VersionedConnection> map) {
        for (final VersionedConnection connection : group.getConnections()) {
            map.put(connection.getIdentifier(), connection);
        }

        for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
            findAllConnections(childGroup, map);
        }
    }

    /**
     * Match components of the given process group to the proposed versioned process group and verify missing components
     * are in a state that they can be safely removed. Specifically, check for removed child process groups and descendants.
     * Disallow removal of groups with attached templates. Optionally also check for removed connections with data in their
     * queue, either because the connections were removed from a matched process group or their group itself was removed.
     *
     * @param processGroup            the current process group to examine
     * @param proposedGroup           the proposed versioned process group to match with
     * @param verifyConnectionRemoval whether or not to verify that connections that are not present in the proposed flow can be removed
     */
    private void verifyCanRemoveMissingComponents(final ProcessGroup processGroup, final VersionedProcessGroup proposedGroup,
                                                  final boolean verifyConnectionRemoval) {
        if (verifyConnectionRemoval) {
            final Map<String, VersionedConnection> proposedConnectionsByVersionedId = proposedGroup.getConnections().stream()
                    .collect(Collectors.toMap(component -> component.getIdentifier(), Function.identity()));

            // match group's current connections to proposed connections to determine if they've been removed
            for (final Connection connection : processGroup.getConnections()) {
                final String versionedId = connection.getVersionedComponentId().orElse(
                        NiFiRegistryFlowMapper.generateVersionedComponentId(connection.getIdentifier()));
                final VersionedConnection proposedConnection = proposedConnectionsByVersionedId.get(versionedId);
                if (proposedConnection == null) {
                    // connection doesn't exist in proposed connections, make sure it doesn't have any data in it
                    final FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
                    if (!flowFileQueue.isEmpty()) {
                        throw new IllegalStateException(this + " cannot be updated to the proposed flow because the proposed flow "
                                + "does not contain a match for " + connection + " and the connection currently has data in the queue.");
                    }
                }
            }
        }

        final Map<String, VersionedProcessGroup> proposedGroupsByVersionedId = proposedGroup.getProcessGroups().stream()
                .collect(Collectors.toMap(component -> component.getIdentifier(), Function.identity()));

        // match current child groups to proposed child groups to determine if they've been removed
        for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
            final String versionedId = childGroup.getVersionedComponentId().orElse(
                    NiFiRegistryFlowMapper.generateVersionedComponentId(childGroup.getIdentifier()));
            final VersionedProcessGroup proposedChildGroup = proposedGroupsByVersionedId.get(versionedId);
            if (proposedChildGroup == null) {
                // child group will be removed, check group and descendants for attached templates
                final Template removedTemplate = findAllTemplates(childGroup).stream().findFirst().orElse(null);
                if (removedTemplate != null) {
                    throw new IllegalStateException(this + " cannot be updated to the proposed flow because the child " + removedTemplate.getProcessGroup()
                            + " that exists locally has one or more Templates, and the proposed flow does not contain these templates. "
                            + "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before re-attempting.");
                }
                if (verifyConnectionRemoval) {
                    // check removed group and its descendants for connections with data in the queue
                    final Connection removedConnection = findAllConnections(childGroup).stream()
                            .filter(connection -> !connection.getFlowFileQueue().isEmpty()).findFirst().orElse(null);
                    if (removedConnection != null) {
                        throw new IllegalStateException(this + " cannot be updated to the proposed flow because the proposed flow "
                                + "does not contain a match for " + removedConnection + " and the connection currently has data in the queue.");
                    }
                }
            } else {
                // child group successfully matched, recurse into verification of its contents
                verifyCanRemoveMissingComponents(childGroup, proposedChildGroup, verifyConnectionRemoval);
            }
        }
    }

    @Override
    public void verifyCanSaveToFlowRegistry(final String registryId, final String bucketId, final String flowId, final String saveAction) {
        verifyNoDescendantsWithLocalModifications("be saved to a Flow Registry");

        final StandardVersionControlInformation vci = versionControlInfo.get();
        if (vci != null) {
            if (flowId != null && flowId.equals(vci.getFlowIdentifier())) {
                // Flow ID is the same. We want to publish the Process Group as the next version of the Flow.
                // In order to do this, we have to ensure that the Process Group is 'current'.
                final VersionedFlowState state = vci.getStatus().getState();
                if (state == VersionedFlowState.STALE
                        || (state == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE && VersionedFlowDTO.COMMIT_ACTION.equals(saveAction))) {
                    throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
                            + " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. "
                            + "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry.");
                }

                // Flow ID matches. We want to publish the Process Group as the next version of the Flow, so we must
                // ensure that all other parameters match as well.
                if (!bucketId.equals(vci.getBucketIdentifier())) {
                    throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
                            + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
                }

                if (!registryId.equals(vci.getRegistryIdentifier())) {
                    throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
                            + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
                }
            } else if (flowId != null) {
                // Flow ID is specified but different. This is not allowed, because Flow ID's are automatically generated,
                // and if the client is specifying an ID then it is either trying to assign the ID of the Flow or it is
                // attempting to save a new version of a different flow. Saving a new version of a different Flow is
                // not allowed because the Process Group must be in synch with the latest version of the flow before that
                // can be done.
                throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
                        + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
            }
        }
    }

    @Override
    public void verifyCanRevertLocalModifications() {
        final StandardVersionControlInformation svci = versionControlInfo.get();
        if (svci == null) {
            throw new IllegalStateException("Cannot revert local modifications to Process Group because the Process Group is not under Version Control.");
        }

        verifyNoDescendantsWithLocalModifications("have its local modifications reverted");
    }

    @Override
    public void verifyCanShowLocalModifications() {

    }

    private void verifyNoDescendantsWithLocalModifications(final String action) {
        for (final ProcessGroup descendant : findAllProcessGroups()) {
            final VersionControlInformation descendantVci = descendant.getVersionControlInformation();
            if (descendantVci != null) {
                final VersionedFlowState flowState = descendantVci.getStatus().getState();
                final boolean modified = flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;

                if (modified) {
                    throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
                            + "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before "
                            + "this action can be performed on the parent Process Group.");
                }

                if (flowState == VersionedFlowState.SYNC_FAILURE) {
                    throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
                            + "is not synchronized with the Flow Registry. Each descendant Process Group must first be synchronized with the Flow Registry before this action can be "
                            + "performed on the parent Process Group. NiFi will continue to attempt to communicate with the Flow Registry periodically in the background.");
                }
            }
        }
    }

    @Override
    public FlowFileGate getFlowFileGate() {
        return flowFileGate;
    }

    @Override
    public FlowFileConcurrency getFlowFileConcurrency() {
        readLock.lock();
        try {
            return flowFileConcurrency;
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public void setFlowFileConcurrency(final FlowFileConcurrency flowFileConcurrency) {
        writeLock.lock();
        try {
            if (this.flowFileConcurrency == flowFileConcurrency) {
                return;
            }

            this.flowFileConcurrency = flowFileConcurrency;
            switch (flowFileConcurrency) {
                case UNBOUNDED:
                    flowFileGate = new UnboundedFlowFileGate();
                    break;
                case SINGLE_FLOWFILE_PER_NODE:
                    flowFileGate = new SingleConcurrencyFlowFileGate(() -> !isDataQueued());
                    break;
            }
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public boolean isDataQueued() {
        return isDataQueued(connection -> true);
    }

    @Override
    public boolean isDataQueuedForProcessing() {
        // Data is queued for processing if a connection has data queued and the connection's destination is NOT an Output Port.
        return isDataQueued(connection -> connection.getDestination().getConnectableType() != ConnectableType.OUTPUT_PORT);
    }

    private boolean isDataQueued(final Predicate<Connection> connectionFilter) {
        readLock.lock();
        try {
            for (final Connection connection : this.connections.values()) {
                // If the connection doesn't pass the filter, just skip over it.
                if (!connectionFilter.test(connection)) {
                    continue;
                }

                final boolean queueEmpty = connection.getFlowFileQueue().isEmpty();
                if (!queueEmpty) {
                    return true;
                }
            }

            for (final ProcessGroup child : this.processGroups.values()) {
                // Check if the child Process Group has any data enqueued. Note that we call #isDataQueued here and NOT
                // #isDataQueeudForProcesing. I.e., regardless of whether this is called from #isDataQueued or #isDataQueuedForProcessing,
                // for child groups, we only call #isDataQueued. This is because if data is queued up for the Output Port of a child group,
                // it is still considered to be data that is being processed by this Process Group.
                if (child.isDataQueued()) {
                    return true;
                }
            }

            return false;
        } finally {
            readLock.unlock();
        }
    }

    @Override
    public FlowFileOutboundPolicy getFlowFileOutboundPolicy() {
        return flowFileOutboundPolicy;
    }

    @Override
    public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy flowFileOutboundPolicy) {
        this.flowFileOutboundPolicy = flowFileOutboundPolicy;
    }
}
